Repository: incubator-systemml Updated Branches: refs/heads/master 401e982bf -> e1a2685ad
[SYSTEMML-158] Update deprecated Hadoop properties Removes the console warnings from deprecated Hadoop properties. Specify Hadoop v1/v2 properties in MRConfigurationName. Update io.sort.mb to mapreduce.task.io.sort.mb Update mapred.job.tracker to mapreduce.jobtracker.address Update mapred.task.tracker.task-controller to mapreduce.tasktracker.taskcontroller Update mapred.local.dir to mapreduce.cluster.local.dir Update mapred.system.dir to mapreduce.jobtracker.system.dir Closes #50. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e1a2685a Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e1a2685a Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e1a2685a Branch: refs/heads/master Commit: e1a2685ad11c4ca0dcc1cf3bfeb8b74f8c15d1ab Parents: 401e982 Author: Deron Eriksson <de...@us.ibm.com> Authored: Tue Jan 26 13:33:13 2016 -0800 Committer: Deron Eriksson <de...@us.ibm.com> Committed: Tue Jan 26 13:33:13 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/DMLScript.java | 15 +++-- .../controlprogram/parfor/RemoteParForMR.java | 4 +- .../parfor/stat/InfrastructureAnalyzer.java | 8 +-- .../runtime/matrix/mapred/MMCJMRCache.java | 2 +- .../matrix/mapred/MRConfigurationNames.java | 67 ++++++++++---------- .../matrix/mapred/MRJobConfiguration.java | 10 +-- .../sysml/yarn/ropt/YarnClusterAnalyzer.java | 4 +- 7 files changed, 59 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/api/DMLScript.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java index 0302fdc..ce37bea 100644 --- a/src/main/java/org/apache/sysml/api/DMLScript.java +++ b/src/main/java/org/apache/sysml/api/DMLScript.java @@ -834,9 +834,9 @@ public class DMLScript //analyze hadoop configuration JobConf job = ConfigurationManager.getCachedJobConf(); boolean localMode = InfrastructureAnalyzer.isLocalMode(job); - String taskController = job.get("mapred.task.tracker.task-controller", "org.apache.hadoop.mapred.DefaultTaskController"); + String taskController = job.get(MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER, "org.apache.hadoop.mapred.DefaultTaskController"); String ttGroupName = job.get("mapreduce.tasktracker.group","null"); - String perm = job.get(MRConfigurationNames.DFS_PERMISSIONS,"null"); //note: job.get("dfs.permissions.supergroup",null); + String perm = job.get(MRConfigurationNames.DFS_PERMISSIONS_ENABLED,"null"); //note: job.get("dfs.permissions.supergroup",null); URI fsURI = FileSystem.getDefaultUri(job); //determine security states @@ -846,9 +846,14 @@ public class DMLScript boolean flagLocalFS = fsURI==null || fsURI.getScheme().equals("file"); boolean flagSecurity = perm.equals("yes"); - LOG.debug("SystemML security check: " + "local.user.name = " + userName + ", " + "local.user.groups = " + ProgramConverter.serializeStringCollection(groupNames) + ", " - + "mapred.job.tracker = " + job.get("mapred.job.tracker") + ", " + "mapred.task.tracker.task-controller = " + taskController + "," + "mapreduce.tasktracker.group = " + ttGroupName + ", " - + "fs.default.name = " + ((fsURI!=null)?fsURI.getScheme():"null") + ", " + MRConfigurationNames.DFS_PERMISSIONS+" = " + perm ); + LOG.debug("SystemML security check: " + + "local.user.name = " + userName + ", " + + "local.user.groups = " + ProgramConverter.serializeStringCollection(groupNames) + ", " + + MRConfigurationNames.MR_JOBTRACKER_ADDRESS + " = " + job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS) + ", " + + MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER + " = " + taskController + "," + + "mapreduce.tasktracker.group = " + ttGroupName + ", " + + "fs.default.name = " + ((fsURI!=null) ? fsURI.getScheme() : "null") + ", " + + MRConfigurationNames.DFS_PERMISSIONS_ENABLED + " = " + perm ); //print warning if permission issues possible if( flagDiffUser && ( flagLocalFS || flagSecurity ) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index 1bdccfc..a17acb8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.lib.NLineInputFormat; - import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; @@ -52,6 +51,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.utils.Statistics; @@ -182,7 +182,7 @@ public class RemoteParForMR job.setNumTasksToExecutePerJvm(-1); //unlimited //set sort io buffer (reduce unnecessary large io buffer, guaranteed memory consumption) - job.setInt("io.sort.mb", 8); //8MB + job.setInt(MRConfigurationNames.MR_TASK_IO_SORT_MB, 8); //8MB //set the replication factor for the results job.setInt("dfs.replication", replication); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index 05cfa17..23fd697 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -265,7 +265,7 @@ public class InfrastructureAnalyzer // Due to a bug in HDP related to fetching the "mode" of execution within mappers, // we explicitly probe the relevant properties instead of relying on results from // analyzeHadoopCluster(). - String jobTracker = job.get("mapred.job.tracker", "local"); + String jobTracker = job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS, "local"); String framework = job.get("mapreduce.framework.name", "local"); boolean isYarnEnabled = (framework!=null && framework.equals("yarn")); @@ -509,7 +509,7 @@ public class InfrastructureAnalyzer { JobConf job = ConfigurationManager.getCachedJobConf(); - _remoteMRSortMem = (1024*1024) * job.getLong("io.sort.mb",100); //1MB + _remoteMRSortMem = (1024*1024) * job.getLong(MRConfigurationNames.MR_TASK_IO_SORT_MB,100); //1MB //handle jvm max mem (map mem budget is relevant for map-side distcache and parfor) //(for robustness we probe both: child and map configuration parameters) @@ -526,7 +526,7 @@ public class InfrastructureAnalyzer _remoteJVMMaxMemReduce = extractMaxMemoryOpt(javaOpts1); //HDFS blocksize - String blocksize = job.get(MRConfigurationNames.DFS_BLOCK_SIZE, "134217728"); + String blocksize = job.get(MRConfigurationNames.DFS_BLOCKSIZE, "134217728"); _blocksize = Long.parseLong(blocksize); //is yarn enabled @@ -546,7 +546,7 @@ public class InfrastructureAnalyzer { //analyze if local mode (if yarn enabled, we always assume cluster mode //in order to workaround configuration issues on >=Hadoop 2.6) - String jobTracker = job.get("mapred.job.tracker", "local"); + String jobTracker = job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS, "local"); return "local".equals(jobTracker) & !isYarnEnabled(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRCache.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRCache.java index 27c9bc7..0dd0c39 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRCache.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRCache.java @@ -90,7 +90,7 @@ public abstract class MMCJMRCache protected void constructLocalFilePrefix(String fname) { //get random localdir (to spread load across available disks) - String[] localDirs = _job.get("mapred.local.dir").split(","); + String[] localDirs = _job.get(MRConfigurationNames.MR_CLUSTER_LOCAL_DIR).split(","); Random rand = new Random(); int randPos = rand.nextInt(localDirs.length); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java index e77b83b..cd6e781 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java @@ -17,54 +17,57 @@ * under the License. */ - package org.apache.sysml.runtime.matrix.mapred; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.VersionInfo; - /** - * This class provides a central local for used hadoop configuration properties. - * For portability, we support both hadoop 1.x and 2.x and automatically map to - * the currently used cluster. + * This class provides a central local for used hadoop configuration properties. For portability, we support both hadoop + * 1.x and 2.x and automatically map to the currently used cluster. * */ -public abstract class MRConfigurationNames -{ +public abstract class MRConfigurationNames { - protected static final Log LOG = LogFactory.getLog(MRConfigurationNames.class.getName()); - - //name definitions - public static final String INVALID = "null"; - public static String DFS_SESSION_ID = INVALID; - public static String DFS_BLOCK_SIZE = INVALID; - public static String DFS_PERMISSIONS = INVALID; - //initialize to used cluster - static{ - - //determine hadoop version - //e.g., 2.0.4-alpha from 0a11e32419bd4070f28c6d96db66c2abe9fd6d91 by jenkins source checksum f3c1bf36ae3aa5a6f6d3447fcfadbba + public static final String DFS_BLOCKSIZE; + public static final String DFS_METRICS_SESSION_ID; + public static final String DFS_PERMISSIONS_ENABLED; + public static final String MR_CLUSTER_LOCAL_DIR; + public static final String MR_JOBTRACKER_ADDRESS; + public static final String MR_JOBTRACKER_SYSTEM_DIR; + public static final String MR_TASK_IO_SORT_MB; + public static final String MR_TASKTRACKER_TASKCONTROLLER; + + // initialize to currently used cluster + static { + // determine hadoop version String version = VersionInfo.getBuildVersion(); boolean hadoopVersion2 = version.startsWith("2"); - LOG.debug("Hadoop build version: "+version); - - if( hadoopVersion2 ) - { + LOG.debug("Hadoop build version: " + version); + + if (hadoopVersion2) { LOG.debug("Using hadoop 2.x configuration properties."); - DFS_SESSION_ID = "dfs.metrics.session-id"; - DFS_BLOCK_SIZE = "dfs.blocksize"; - DFS_PERMISSIONS = "dfs.permissions.enabled"; - } - else //any older version - { + DFS_BLOCKSIZE = "dfs.blocksize"; + DFS_METRICS_SESSION_ID = "dfs.metrics.session-id"; + DFS_PERMISSIONS_ENABLED = "dfs.permissions.enabled"; + MR_CLUSTER_LOCAL_DIR = "mapreduce.cluster.local.dir"; + MR_JOBTRACKER_ADDRESS = "mapreduce.jobtracker.address"; + MR_JOBTRACKER_SYSTEM_DIR = "mapreduce.jobtracker.system.dir"; + MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb"; + MR_TASKTRACKER_TASKCONTROLLER = "mapreduce.tasktracker.taskcontroller"; + } else { // any older version LOG.debug("Using hadoop 1.x configuration properties."); - DFS_SESSION_ID = "session.id"; - DFS_BLOCK_SIZE = "dfs.block.size"; - DFS_PERMISSIONS = "dfs.permissions"; + DFS_BLOCKSIZE = "dfs.block.size"; + DFS_METRICS_SESSION_ID = "session.id"; + DFS_PERMISSIONS_ENABLED = "dfs.permissions"; + MR_CLUSTER_LOCAL_DIR = "mapred.local.dir"; + MR_JOBTRACKER_ADDRESS = "mapred.job.tracker"; + MR_JOBTRACKER_SYSTEM_DIR = "mapred.system.dir"; + MR_TASK_IO_SORT_MB = "io.sort.mb"; + MR_TASKTRACKER_TASKCONTROLLER = "mapred.task.tracker.task-controller"; } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java index 3382a0c..6d30b5e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java @@ -424,7 +424,7 @@ public class MRJobConfiguration String uniqueSubdir = tmp.toString(); //unique local dir - String[] dirlist = job.get("mapred.local.dir","/tmp").split(","); + String[] dirlist = job.get(MRConfigurationNames.MR_CLUSTER_LOCAL_DIR,"/tmp").split(","); StringBuilder sb2 = new StringBuilder(); for( String dir : dirlist ) { if( sb2.length()>0 ) @@ -432,10 +432,10 @@ public class MRJobConfiguration sb2.append(dir); sb2.append( uniqueSubdir ); } - job.set("mapred.local.dir", sb2.toString() ); + job.set(MRConfigurationNames.MR_CLUSTER_LOCAL_DIR, sb2.toString() ); //unique system dir - job.set("mapred.system.dir", job.get("mapred.system.dir") + uniqueSubdir); + job.set(MRConfigurationNames.MR_JOBTRACKER_SYSTEM_DIR, job.get(MRConfigurationNames.MR_JOBTRACKER_SYSTEM_DIR) + uniqueSubdir); //unique staging dir job.set( "mapreduce.jobtracker.staging.root.dir", job.get("mapreduce.jobtracker.staging.root.dir") + uniqueSubdir ); @@ -444,12 +444,12 @@ public class MRJobConfiguration public static String getLocalWorkingDirPrefix(JobConf job) { - return job.get("mapred.local.dir"); + return job.get(MRConfigurationNames.MR_CLUSTER_LOCAL_DIR); } public static String getSystemWorkingDirPrefix(JobConf job) { - return job.get("mapred.system.dir"); + return job.get(MRConfigurationNames.MR_JOBTRACKER_SYSTEM_DIR); } public static String getStagingWorkingDirPrefix(JobConf job) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e1a2685a/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java b/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java index af15807..18a2dfc 100644 --- a/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java +++ b/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java @@ -676,7 +676,7 @@ public class YarnClusterAnalyzer throw new YarnException("There are no available nodes in the yarn cluster"); // Now get the default cluster settings - _remoteMRSortMem = (1024*1024) * conf.getLong("io.sort.mb",100); //100MB + _remoteMRSortMem = (1024*1024) * conf.getLong(MRConfigurationNames.MR_TASK_IO_SORT_MB,100); //100MB //handle jvm max mem (map mem budget is relevant for map-side distcache and parfor) //(for robustness we probe both: child and map configuration parameters) @@ -693,7 +693,7 @@ public class YarnClusterAnalyzer _remoteJVMMaxMemReduce = extractMaxMemoryOpt(javaOpts1); //HDFS blocksize - String blocksize = conf.get(MRConfigurationNames.DFS_BLOCK_SIZE, "134217728"); + String blocksize = conf.get(MRConfigurationNames.DFS_BLOCKSIZE, "134217728"); _blocksize = Long.parseLong(blocksize); minimalPhyAllocate = (long) 1024 * 1024 * conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,