FileSystem为什么由DistributeFileSystem 来实现
FileSystem 有13个实现类 create 使用的是 DistributeFileSystem 实现的方法!
这块没有说清楚为什么?
我跟踪了代码 发现hadoop会加载全部的配置文件
会调用自身的静态方法
createFileSystem(uri, conf)
然后 通过配置的uri的scheme进行反射
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
getFileSystemClass这个方法递归调用 最终得到了 DistributeFileSystem
这个过程还不是太明白 ??
我在代码过程中发现了 SERVICE_FILE_SYSTEMS HashMap 这个里面被放入了多种实现方法,hadoop是根据什么完成这个操作的??
正在回答
1:首先进入FileSystem fileSystem = FileSystem.get(conf)中的get方法
最终会进入这个方法里面
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
//下一步进入这个方法
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}2:然后进入createFileSysterm方法
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
//下一步进入这个方法
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
}3:接着进入getFileSystermClass
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
//下一步进入这个方法
loadFileSystems();
}
LOGGER.debug("Looking for FS supporting {}", scheme);
Class<? extends FileSystem> clazz = null;
if (conf != null) {
String property = "fs." + scheme + ".impl";
LOGGER.debug("looking for configuration option {}", property);
clazz = (Class<? extends FileSystem>) conf.getClass(
property, null);
} else {
LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
}
if (clazz == null) {
LOGGER.debug("Looking in service filesystems for implementation class");
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
} else {
LOGGER.debug("Filesystem {} defined in configuration option", scheme);
}
if (clazz == null) {
throw new UnsupportedFileSystemException("No FileSystem for scheme "
+ "\"" + scheme + "\"");
}
LOGGER.debug("FS for {} is {}", scheme, clazz);
return clazz;
}4:接着进入loadFileSystems方法【核心代码在这里面】
private static void loadFileSystems() {
LOGGER.debug("Loading filesystems");
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
//通过查看ServiceLoader这个类的注释可以大致看出来,这里可以获取到所有文件系统的实现类
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
Iterator<FileSystem> it = serviceLoader.iterator();
while (it.hasNext()) {
FileSystem fs;
try {
fs = it.next();
try {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{}:// = {} from {}",
fs.getScheme(), fs.getClass(),
ClassUtil.findContainingJar(fs.getClass()));
}
} catch (Exception e) {
LOGGER.warn("Cannot load: {} from {}", fs,
ClassUtil.findContainingJar(fs.getClass()));
LOGGER.info("Full exception loading: {}", fs, e);
}
} catch (ServiceConfigurationError ee) {
LOG.warn("Cannot load filesystem: " + ee);
Throwable cause = ee.getCause();
// print all the nested exception messages
while (cause != null) {
LOG.warn(cause.toString());
cause = cause.getCause();
}
// and at debug: the full stack
LOG.debug("Stack Trace", ee);
}
}
FILE_SYSTEMS_LOADED = true;
}
}
}5:在本地打印一下ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);的返回值
public class HdfsOpTest {
public static void main(String[] args) throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
Iterator<FileSystem> it = serviceLoader.iterator();
while (it.hasNext()) {
FileSystem fs = it.next();
System.out.println(fs.getScheme()+fs.getClass());
}
}
}代码执行结果如下:
fileclass org.apache.hadoop.fs.LocalFileSystem viewfsclass org.apache.hadoop.fs.viewfs.ViewFileSystem harclass org.apache.hadoop.fs.HarFileSystem httpclass org.apache.hadoop.fs.http.HttpFileSystem httpsclass org.apache.hadoop.fs.http.HttpsFileSystem hdfsclass org.apache.hadoop.hdfs.DistributedFileSystem webhdfsclass org.apache.hadoop.hdfs.web.WebHdfsFileSystem swebhdfsclass org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
从这里可以看出来,hdfsclass对应的实现类是DistributedFileSystem,这些属于hadoop里面指定的默认的实现类
后面在使用的时候会通过反射的方法,基于org.apache.hadoop.hdfs.DistributedFileSystem进行实例化,获取DistributedFileSystem对象

恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星