FileSystem为什么由DistributeFileSystem 来实现

FileSystem为什么由DistributeFileSystem 来实现

FileSystem 有13个实现类 create 使用的是 DistributeFileSystem 实现的方法!

这块没有说清楚为什么?


我跟踪了代码 发现hadoop会加载全部的配置文件 


会调用自身的静态方法

createFileSystem(uri, conf)

然后 通过配置的uri的scheme进行反射

Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);

getFileSystemClass这个方法递归调用 最终得到了 DistributeFileSystem


这个过程还不是太明白 ??


我在代码过程中发现了 SERVICE_FILE_SYSTEMS HashMap 这个里面被放入了多种实现方法,hadoop是根据什么完成这个操作的??


下载视频          

正在回答

登陆购买课程后可参与讨论,去登陆

1回答

1:首先进入FileSystem fileSystem = FileSystem.get(conf)中的get方法

最终会进入这个方法里面

public static FileSystem get(URI uri, Configuration conf) throws IOException {
  String scheme = uri.getScheme();
  String authority = uri.getAuthority();

  if (scheme == null && authority == null) {     // use default FS
    return get(conf);
  }

  if (scheme != null && authority == null) {     // no authority
    URI defaultUri = getDefaultUri(conf);
    if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
        && defaultUri.getAuthority() != null) {  // & default has authority
      return get(defaultUri, conf);              // return default
    }
  }
  String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
  if (conf.getBoolean(disableCacheName, false)) {
    LOGGER.debug("Bypassing cache to create filesystem {}", uri);
    //下一步进入这个方法
    return createFileSystem(uri, conf);
  }

  return CACHE.get(uri, conf);
}


2:然后进入createFileSysterm方法

private static FileSystem createFileSystem(URI uri, Configuration conf)
    throws IOException {
  Tracer tracer = FsTracer.get(conf);
  try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
    scope.addKVAnnotation("scheme", uri.getScheme());
    //下一步进入这个方法
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }
}


3:接着进入getFileSystermClass

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
    Configuration conf) throws IOException {
  if (!FILE_SYSTEMS_LOADED) {
  //下一步进入这个方法
    loadFileSystems();
  }
  LOGGER.debug("Looking for FS supporting {}", scheme);
  Class<? extends FileSystem> clazz = null;
  if (conf != null) {
    String property = "fs." + scheme + ".impl";
    LOGGER.debug("looking for configuration option {}", property);
    clazz = (Class<? extends FileSystem>) conf.getClass(
        property, null);
  } else {
    LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
  }
  if (clazz == null) {
    LOGGER.debug("Looking in service filesystems for implementation class");
    clazz = SERVICE_FILE_SYSTEMS.get(scheme);
  } else {
    LOGGER.debug("Filesystem {} defined in configuration option", scheme);
  }
  if (clazz == null) {
    throw new UnsupportedFileSystemException("No FileSystem for scheme "
        + "\"" + scheme + "\"");
  }
  LOGGER.debug("FS for {} is {}", scheme, clazz);
  return clazz;
}


4:接着进入loadFileSystems方法【核心代码在这里面】

private static void loadFileSystems() {
  LOGGER.debug("Loading filesystems");
  synchronized (FileSystem.class) {
    if (!FILE_SYSTEMS_LOADED) {
    //通过查看ServiceLoader这个类的注释可以大致看出来,这里可以获取到所有文件系统的实现类
      ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
      Iterator<FileSystem> it = serviceLoader.iterator();
      while (it.hasNext()) {
        FileSystem fs;
        try {
          fs = it.next();
          try {
            SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
            if (LOGGER.isDebugEnabled()) {
              LOGGER.debug("{}:// = {} from {}",
                  fs.getScheme(), fs.getClass(),
                  ClassUtil.findContainingJar(fs.getClass()));
            }
          } catch (Exception e) {
            LOGGER.warn("Cannot load: {} from {}", fs,
                ClassUtil.findContainingJar(fs.getClass()));
            LOGGER.info("Full exception loading: {}", fs, e);
          }
        } catch (ServiceConfigurationError ee) {
          LOG.warn("Cannot load filesystem: " + ee);
          Throwable cause = ee.getCause();
          // print all the nested exception messages
          while (cause != null) {
            LOG.warn(cause.toString());
            cause = cause.getCause();
          }
          // and at debug: the full stack
          LOG.debug("Stack Trace", ee);
        }
      }
      FILE_SYSTEMS_LOADED = true;
    }
  }
}


5:在本地打印一下ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);的返回值

public class HdfsOpTest {
    public static void main(String[] args) throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://bigdata01:9000");
        //获取操作HDFS的对象
        FileSystem fileSystem = FileSystem.get(conf);
        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
        Iterator<FileSystem> it = serviceLoader.iterator();
        while (it.hasNext()) {
            FileSystem fs = it.next();
            System.out.println(fs.getScheme()+fs.getClass());
        }
    }
}

代码执行结果如下:

fileclass org.apache.hadoop.fs.LocalFileSystem
viewfsclass org.apache.hadoop.fs.viewfs.ViewFileSystem
harclass org.apache.hadoop.fs.HarFileSystem
httpclass org.apache.hadoop.fs.http.HttpFileSystem
httpsclass org.apache.hadoop.fs.http.HttpsFileSystem
hdfsclass org.apache.hadoop.hdfs.DistributedFileSystem
webhdfsclass org.apache.hadoop.hdfs.web.WebHdfsFileSystem
swebhdfsclass org.apache.hadoop.hdfs.web.SWebHdfsFileSystem

从这里可以看出来,hdfsclass对应的实现类是DistributedFileSystem,这些属于hadoop里面指定的默认的实现类



后面在使用的时候会通过反射的方法,基于org.apache.hadoop.hdfs.DistributedFileSystem进行实例化,获取DistributedFileSystem对象



  • 还有一种最直接的获取方式,直接查看FileSystem类的注释,如下图,里面也说到了它的实现类是谁。

    https://img1.sycdn.imooc.com//climg/61a61c3809f0bce718580953.jpg

    2021-11-30 20:42:35
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师