Author: vikram Date: Fri Aug 8 23:23:55 2014 New Revision: 1616904 URL: http://svn.apache.org/r1616904 Log: Bring tez-branch up-to the API changes made by TEZ-1372 (Gopal V via Vikram Dixit)
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1616904&r1=1616903&r2=1616904&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Fri Aug 8 23:23:55 2014 @@ -43,6 +43,7 @@ public class CustomPartitionEdge extends // used by the framework at runtime. initialize is the real initializer at runtime public CustomPartitionEdge(EdgeManagerContext context) { super(context); + this.context = context; } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1616904&r1=1616903&r2=1616904&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Aug 8 23:23:55 2014 @@ -79,7 +79,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.tez.client.PreWarmContext; +import org.apache.tez.client.PreWarmVertex; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.DataSourceDescriptor; @@ -380,6 +380,15 @@ public class DagUtils { } /* + * Helper to setup default environment for a task in YARN. + */ + private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) { + Map<String, String> environment = new HashMap<String, String>(); + MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap); + return environment; + } + + /* * Helper to determine what java options to use for the containers * Falls back to Map-reduces map java opts if no tez specific options * are set @@ -449,7 +458,7 @@ public class DagUtils { // is HiveInputFormat if (inputFormatClass == HiveInputFormat.class) { useTezGroupedSplits = true; - conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class); + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); } } @@ -476,9 +485,7 @@ public class DagUtils { map = new Vertex(mapWork.getName(), new ProcessorDescriptor(MapTezProcessor.class.getName()). setUserPayload(serializedConf), numTasks, getContainerResource(conf)); - Map<String, String> environment = new HashMap<String, String>(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); - map.setTaskEnvironment(environment); + map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); assert mapWork.getAliasToWork().keySet().size() == 1; @@ -487,10 +494,9 @@ public class DagUtils { byte[] mrInput = null; if (useTezGroupedSplits) { - mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf, - HiveInputFormat.class.getName()); + mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf); } else { - mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + mrInput = MRHelpers.createMRInputPayload(serializedConf); } map.addDataSource(alias, new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()). @@ -550,11 +556,7 @@ public class DagUtils { reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork .getNumReduceTasks(), getContainerResource(conf)); - Map<String, String> environment = new HashMap<String, String>(); - - MRHelpers.updateEnvironmentForMRTasks(conf, environment, false); - reducer.setTaskEnvironment(environment); - + reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); @@ -598,17 +600,16 @@ public class DagUtils { /** * @param numContainers number of containers to pre-warm * @param localResources additional resources to pre-warm with - * @return prewarm context object + * @return prewarm vertex to run */ - public PreWarmContext createPreWarmContext(TezConfiguration conf, + public PreWarmVertex createPreWarmVertex(TezConfiguration conf, int numContainers, Map<String, LocalResource> localResources) throws IOException, TezException { ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName()); prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf)); - PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf), - numContainers, null); + PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf)); Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>(); @@ -616,14 +617,10 @@ public class DagUtils { combinedResources.putAll(localResources); } - context.setLocalResources(combinedResources); - - /* boiler plate task env */ - Map<String, String> environment = new HashMap<String, String>(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); - context.setEnvironment(environment); - context.setJavaOpts(getContainerJavaOpts(conf)); - return context; + prewarmVertex.setTaskLocalFiles(localResources); + prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); + prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false)); + return prewarmVertex; } /** Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1616904&r1=1616903&r2=1616904&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Aug 8 23:23:55 2014 @@ -90,7 +90,7 @@ public class HiveSplitGenerator extends ShimLoader.getHadoopShims().getMergedCredentials(jobConf); InputSplitInfoMem inputSplitInfo = null; - String realInputFormatName = userPayloadProto.getInputFormatName(); + String realInputFormatName = conf.get("mapred.input.format.class"); if (realInputFormatName != null && !realInputFormatName.isEmpty()) { // Need to instantiate the realInputFormat InputFormat<?, ?> inputFormat = @@ -123,7 +123,8 @@ public class HiveSplitGenerator extends inputSplitInfo = new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf); } else { - inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf); + // no need for grouping and the target #of tasks. + inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0); } return createEventList(sendSerializedEvents, inputSplitInfo); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1616904&r1=1616903&r2=1616904&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Aug 8 23:23:55 2014 @@ -50,11 +50,12 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.PreWarmContext; import org.apache.tez.client.TezClient; +import org.apache.tez.client.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.hadoop.MRHelpers; /** @@ -170,6 +171,15 @@ public class TezSessionState { // generate basic tez config TezConfiguration tezConfig = new TezConfiguration(conf); tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { + int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); + n = Math.max(tezConfig.getInt( + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n); + tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); + } + session = new TezClient("HIVE-" + sessionId, tezConfig, true, commonLocalResources, null); @@ -182,10 +192,10 @@ public class TezSessionState { int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); LOG.info("Prewarming " + n + " containers (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); - PreWarmContext context = utils.createPreWarmContext(tezConfig, n, + PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n, commonLocalResources); try { - session.preWarm(context); + session.preWarm(prewarmVertex); } catch (InterruptedException ie) { if (LOG.isDebugEnabled()) { LOG.debug("Hive Prewarm threw an exception ", ie);