平时,加载hadoop conf的配置时代码如下:
Configuration conf = new Configuration(); conf.addResource("core-site.xml"); System.out.println(conf.get("hadoop.tmp.dir"));
这里我们就来讲解,为什么hadoop不直接用properties等配置,而是自己实现一个Configuration
public class Configuration implements Iterable<>>, Writable
首先Configuration类继承了Iterable和Writable,很显而易见,Iterable<>
public void readFields(DataInput in) throws IOException { clear(); int size = WritableUtils.readVInt(in); for(int i=0; i < size; ++i) { String key = org.apache.hadoop.io.Text.readString(in); String value = org.apache.hadoop.io.Text.readString(in); set(key, value); String sources[] = WritableUtils.readCompressedStringArray(in); updatingResource.put(key, sources); } } //@Override @Override public void write(DataOutput out) throws IOException { Properties props = getProps(); WritableUtils.writeVInt(out, props.size()); for(Map.Entry
创建一个Configuration类,首先是一个静态代码块
static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); }
这个静态代码告诉我们,不需要hadoop-site.xml配置,另外,初始化的时候,先将core-default.xml加载进来,再加载core-site.xml,当然,如果我们传入的话,会覆盖它原来的core-site.xml配置,如果不传core-site.xml配置文件,直接conf.set也行,总之,有多种设置配置的方式。
初始化的时候会调用addDefaultResource("*.xml")
private static final CopyOnWriteArrayListdefaultResources = new CopyOnWriteArrayList (); rivate static final WeakHashMap REGISTRY = new WeakHashMap (); public Configuration() { this(true); } public Configuration(boolean loadDefaults) { this.loadDefaults = loadDefaults; updatingResource = new HashMap (); synchronized(Configuration.class) { REGISTRY.put(this, null); } } public static synchronized void addDefaultResource(String name) { if(!defaultResources.contains(name)) { defaultResources.add(name); for(Configuration conf : REGISTRY.keySet()) { if(conf.loadDefaults) { conf.reloadConfiguration(); } } } }
new对象的时候,先将Configuration的class加入到REGISTRY集合中,然后在addDefaultResource中,将默认的core-default.xml和core-site.xml存到defaultResources集合中,注意这里使用的是CopyOnWriteArrayList,这是一个线程安全的集合。里面存放的也只是配置文件的名字,并不是配置。
接下来就是conf.addResource方法
public void addResource(String name) { addResourceObject(new Resource(name)); } public void addResource(URL url) { addResourceObject(new Resource(url)); } public void addResource(Path file) { addResourceObject(new Resource(file)); } public void addResource(InputStream in) { addResourceObject(new Resource(in)); } public void addResource(InputStream in, String name) { addResourceObject(new Resource(in, name)); } public void addResource(Configuration conf) { addResourceObject(new Resource(conf.getProps())); }
addResource有多种添加的方式,可以是filename,也可以是URL,可以是File,可以是InputStream或者是conf对象,这里值得注意的是,它是一种懒加载的方式,add的时候并不会里面加载配置,而是等使用的时候才会。
public String get(String name) { String[] names = handleDeprecation(deprecationContext.get(), name); String result = null; for(String n : names) { result = substituteVars(getProps().getProperty(n)); } return result; } /** * Checks for the presence of the property name in the * deprecation map. Returns the first of the list of new keys if present * in the deprecation map or the name itself. If the property * is not presently set but the property map contains an entry for the * deprecated key, the value of the deprecated key is set as the value for * the provided property name. * * @param name the property name * @return the first property in the list of properties mapping * the name or the name itself. */ private String[] handleDeprecation(DeprecationContext deprecations, String name) { if (null != name) { name = name.trim(); } ArrayListnames = new ArrayList (); if (isDeprecated(name)) { DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name); warnOnceIfDeprecated(deprecations, name); for (String newKey : keyInfo.newKeys) { if(newKey != null) { names.add(newKey); } } } if(names.size() == 0) { names.add(name); } for(String n : names) { String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n); if (deprecatedKey != null && !getOverlay().containsKey(n) && getOverlay().containsKey(deprecatedKey)) { getProps().setProperty(n, getOverlay().getProperty(deprecatedKey)); getOverlay().setProperty(n, getOverlay().getProperty(deprecatedKey)); } } return names.toArray(new String[names.size()]); } private static AtomicReference deprecationContext = new AtomicReference ( new DeprecationContext(null, defaultDeprecations)); private static DeprecationDelta[] defaultDeprecations = new DeprecationDelta[] { new DeprecationDelta("topology.script.file.name", CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY), new DeprecationDelta("topology.script.number.args", CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY), new DeprecationDelta("hadoop.configured.node.mapping", CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY), new DeprecationDelta("topology.node.switch.mapping.impl", CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY), new DeprecationDelta("dfs.df.interval", CommonConfigurationKeys.FS_DF_INTERVAL_KEY), new DeprecationDelta("hadoop.native.lib", CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY), new DeprecationDelta("fs.default.name", CommonConfigurationKeys.FS_DEFAULT_NAME_KEY), new DeprecationDelta("dfs.umaskmode", CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY), new DeprecationDelta("dfs.nfs.exports.allowed.hosts", CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_KEY) }; ....
上面的注释写的很清楚,handleDeprecation是检查获取的name的,如果返回的是list,则选第一个,如果配置是已经过期的,但是设置了值,则将它设置为你预设的值。
AtomicReference也是一个常用的加锁的方法,因为赋值操作不是线程安全的。若想不用锁来实现,可以用AtomicReference
protected synchronized Properties getProps() { if (properties == null) { properties = new Properties(); HashMapbackup = new HashMap (updatingResource); loadResources(properties, resources, quietmode); if (overlay!= null) { properties.putAll(overlay); for (Map.Entry
loadResources方法开始真正的加载配置,首先将defaultResources中的配置进行遍历,然后,如果有hadoop-site.xml的话,另外处理,如果loadDefaults为false的话,直接for循环遍历
private static class Resource { private final Object resource; private final String name; public Resource(Object resource) { this(resource, resource.toString()); } public Resource(Object resource, String name) { this.resource = resource; this.name = name; } public String getName(){ return name; } public Object getResource() { return resource; } @Override public String toString() { return name; } } private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) { String name = UNKNOWN_RESOURCE; try { Object resource = wrapper.getResource(); name = wrapper.getName(); DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); //ignore all comments inside the xml file docBuilderFactory.setIgnoringComments(true); //allow includes in the xml file docBuilderFactory.setNamespaceAware(true); try { docBuilderFactory.setXIncludeAware(true); } catch (UnsupportedOperationException e) { LOG.error("Failed to set setXIncludeAware(true) for parser " + docBuilderFactory + ":" + e, e); } DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = null; Element root = null; boolean returnCachedProperties = false; if (resource instanceof URL) { // an URL resource doc = parse(builder, (URL)resource); } else if (resource instanceof String) { // a CLASSPATH resource URL url = getResource((String)resource); doc = parse(builder, url); } else if (resource instanceof Path) { // a file resource // Can't use FileSystem API or we get an infinite loop // since FileSystem uses Configuration API. Use java.io.File instead. File file = new File(((Path)resource).toUri().getPath()) .getAbsoluteFile(); if (file.exists()) { if (!quiet) { LOG.debug("parsing File " + file); } doc = parse(builder, new BufferedInputStream( new FileInputStream(file)), ((Path)resource).toString()); } } else if (resource instanceof InputStream) { doc = parse(builder, (InputStream) resource, null); returnCachedProperties = true; } else if (resource instanceof Properties) { overlay(properties, (Properties)resource); } else if (resource instanceof Element) { root = (Element)resource; } if (root == null) { if (doc == null) { if (quiet) { return null; } throw new RuntimeException(resource + " not found"); } root = doc.getDocumentElement(); } Properties toAddTo = properties; if(returnCachedProperties) { toAddTo = new Properties(); } if (!"configuration".equals(root.getTagName())) LOG.fatal("bad conf file: top-level element not"); NodeList props = root.getChildNodes(); DeprecationContext deprecations = deprecationContext.get(); for (int i = 0; i < props.getLength(); i++) { Node propNode = props.item(i); if (!(propNode instanceof Element)) continue; Element prop = (Element)propNode; if ("configuration".equals(prop.getTagName())) { loadResource(toAddTo, new Resource(prop, name), quiet); continue; } if (!"property".equals(prop.getTagName())) LOG.warn("bad conf file: element not "); NodeList fields = prop.getChildNodes(); String attr = null; String value = null; boolean finalParameter = false; LinkedList source = new LinkedList (); for (int j = 0; j < fields.getLength(); j++) { Node fieldNode = fields.item(j); if (!(fieldNode instanceof Element)) continue; Element field = (Element)fieldNode; if ("name".equals(field.getTagName()) && field.hasChildNodes()) attr = StringInterner.weakIntern( ((Text)field.getFirstChild()).getData().trim()); if ("value".equals(field.getTagName()) && field.hasChildNodes()) value = StringInterner.weakIntern( ((Text)field.getFirstChild()).getData()); if ("final".equals(field.getTagName()) && field.hasChildNodes()) finalParameter = "true".equals(((Text)field.getFirstChild()).getData()); if ("source".equals(field.getTagName()) && field.hasChildNodes()) source.add(StringInterner.weakIntern( ((Text)field.getFirstChild()).getData())); } source.add(name); // Ignore this parameter if it has already been marked as 'final' if (attr != null) { if (deprecations.getDeprecatedKeyMap().containsKey(attr)) { DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(attr); keyInfo.clearAccessed(); for (String key:keyInfo.newKeys) { // update new keys with deprecated key's value loadProperty(toAddTo, name, key, value, finalParameter, source.toArray(new String[source.size()])); } } else { loadProperty(toAddTo, name, attr, value, finalParameter, source.toArray(new String[source.size()])); } } } if (returnCachedProperties) { overlay(properties, toAddTo); return new Resource(toAddTo, name); } return null; } catch (IOException e) { LOG.fatal("error parsing conf " + name, e); throw new RuntimeException(e); } catch (DOMException e) { LOG.fatal("error parsing conf " + name, e); throw new RuntimeException(e); } catch (SAXException e) { LOG.fatal("error parsing conf " + name, e); throw new RuntimeException(e); } catch (ParserConfigurationException e) { LOG.fatal("error parsing conf " + name , e); throw new RuntimeException(e); } }
这里基本上就是使用DocumentBuilderFactory对XML进行解析,返回一个Resource(properties,name),properties就是包含了key-value值,name则是前面定义的,可以看出,它本质上还是讲XML解析成了类似properties的形式。
public String get(String name) { String[] names = handleDeprecation(deprecationContext.get(), name); String result = null; for(String n : names) { result = substituteVars(getProps().getProperty(n)); } return result; }
回到开始,结果还是使用的properties的方法返回配置。
还有一些其他的方法例如getTrimmed、getRaw、getAlternativeNames、getInts、getLong、getTimeDuration、getPattern、getTrimmedStringCollection、getPassword、getSocketAddr、getClasses、getValByRegex.......比较简单。