Repository: hive Updated Branches: refs/heads/llap 6bdb903e4 -> ef454511d
HIVE-11393. LLAP: Fix API usage to work with evolving Tez APIs - TEZ-{2651,2652,2653}. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ef454511 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ef454511 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ef454511 Branch: refs/heads/llap Commit: ef454511dd8614de0a5d30466d9a7c6bb2c3b10b Parents: 6bdb903 Author: Siddharth Seth <ss...@apache.org> Authored: Tue Jul 28 14:57:52 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue Jul 28 14:57:52 2015 -0700 ---------------------------------------------------------------------- .../llap/tezplugins/LlapContainerLauncher.java | 2 +- .../llap/tezplugins/LlapTaskCommunicator.java | 37 +++++++------- .../dag/app/rm/LlapTaskSchedulerService.java | 9 +++- .../app/rm/TestLlapTaskSchedulerService.java | 13 ++++- .../hadoop/hive/ql/exec/tez/DagUtils.java | 39 ++++++++------- .../hive/ql/exec/tez/TezSessionState.java | 51 +++++++++----------- 6 files changed, 82 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java index 3f1f58f..07703a2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java @@ -25,7 +25,7 @@ public class LlapContainerLauncher extends ContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class); public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) { - super(LlapContainerLauncher.class.getName(), containerLauncherContext); + super(containerLauncherContext); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index dc06c97..44fd7e3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -106,12 +106,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); credentialMap = new ConcurrentHashMap<>(); - sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this); + sourceStateTracker = new SourceStateTracker(getContext(), this); } @Override - public void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); + public void initialize() throws Exception { + super.initialize(); + Configuration conf = getConf(); int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS, LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT); this.communicator = new TaskCommunicator(numThreads, conf); @@ -124,14 +125,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override - public void serviceStart() { - super.serviceStart(); + public void start() { + super.start(); this.communicator.start(); } @Override - public void serviceStop() { - super.serviceStop(); + public void shutdown() { + super.shutdown(); if (this.communicator != null) { this.communicator.stop(); } @@ -139,7 +140,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override protected void startRpcServer() { - Configuration conf = getConfig(); + Configuration conf = getConf(); try { JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); @@ -232,7 +233,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. - getTaskCommunicatorContext() + getContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); communicator.sendSubmitWork(requestProto, host, port, new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { @@ -255,14 +256,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { LOG.info( "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Service Busy"); - getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); } else { // All others from the remote service cause the task to FAIL. LOG.info( "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); - getTaskCommunicatorContext() + getContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.toString()); } @@ -272,14 +273,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { LOG.info( "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Communication Error"); - getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { // Anything else is a FAIL. LOG.info( "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); - getTaskCommunicatorContext() + getContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.getMessage()); } @@ -406,11 +407,11 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { builder.setAmPort(getAddress().getPort()); Credentials taskCredentials = new Credentials(); // Credentials can change across DAGs. Ideally construct only once per DAG. - taskCredentials.addAll(getTaskCommunicatorContext().getCredentials()); + taskCredentials.addAll(getContext().getCredentials()); ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); if (credentialsBinary == null) { - credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials()); + credentialsBinary = serializeCredentials(getContext().getCredentials()); credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); } else { credentialsBinary = credentialsBinary.duplicate(); @@ -459,7 +460,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { // TODO Unregister the task for state updates, which could in turn unregister the node. - getTaskCommunicatorContext().taskKilled(taskAttemptId, + getContext().taskKilled(taskAttemptId, TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted"); entityTracker.unregisterTaskAttempt(taskAttemptId); } @@ -598,8 +599,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { if (biMap != null) { synchronized(biMap) { for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { - getTaskCommunicatorContext().taskAlive(entry.getValue()); - getTaskCommunicatorContext().containerAlive(entry.getKey()); + getContext().taskAlive(entry.getValue()); + getContext().containerAlive(entry.getKey()); } } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index b6ee3d8..f31c6a5 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -68,6 +68,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -159,7 +161,12 @@ public class LlapTaskSchedulerService extends TaskScheduler { public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) { super(taskSchedulerContext); this.clock = clock; - this.conf = taskSchedulerContext.getInitialConfiguration(); + try { + this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e); + } this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), taskSchedulerContext.getCustomClusterIdentifier()); this.memoryPerInstance = http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 245c140..3737e55 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ControlledClock; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -301,7 +303,8 @@ public class TestLlapTaskSchedulerService { doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); - doReturn(conf).when(mockAppCallback).getInitialConfiguration(); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); @@ -362,7 +365,13 @@ public class TestLlapTaskSchedulerService { public LlapTaskSchedulerServiceForTest( TaskSchedulerContext appClient, Clock clock) { super(appClient, clock); - this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false); + Configuration conf; + try { + conf = TezUtils.createConfFromUserPayload(appClient.getInitialUserPayload()); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false); } protected void schedulePendingTasks() { http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 45b092c..bd69744 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -108,6 +108,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; @@ -625,15 +626,13 @@ public class DagUtils { procClassName = MergeFileTezProcessor.class.getName(); } - String serviceName = findServiceName(mapWork); + VertexExecutionContext executionContext = createVertexExecutionContext(mapWork); map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) - .setUserPayload(serializedConf), numTasks, getContainerResource(conf)) - .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName); + .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); map.setTaskEnvironment(getContainerEnvironment(conf, true)); + map.setExecutionContext(executionContext); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); assert mapWork.getAliasToWork().keySet().size() == 1; @@ -671,11 +670,17 @@ public class DagUtils { return conf; } - private String findServiceName(BaseWork work) { - String serviceName = TezSessionState.DEFAULT_SERVICE; - if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE; - if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE; - return serviceName; + private VertexExecutionContext createVertexExecutionContext(BaseWork work) { + VertexExecutionContext vertexExecutionContext = VertexExecutionContext.createExecuteInContainers(true); + if (work.getLlapMode()) { + vertexExecutionContext = VertexExecutionContext + .create(TezSessionState.LLAP_SERVICE, TezSessionState.LLAP_SERVICE, + TezSessionState.LLAP_SERVICE); + } + if (work.getUberMode()) { + vertexExecutionContext = VertexExecutionContext.createExecuteInAm(true); + } + return vertexExecutionContext; } /* @@ -692,20 +697,18 @@ public class DagUtils { // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); - String serviceName = findServiceName(reduceWork); + VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork); // create the vertex Vertex reducer = Vertex.create(reduceWork.getName(), ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). - setUserPayload(TezUtils.createUserPayloadFromConf(conf)), - reduceWork.isAutoReduceParallelism()? - reduceWork.getMaxReduceTasks(): - reduceWork.getNumReduceTasks(), getContainerResource(conf)) - .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName); + setUserPayload(TezUtils.createUserPayloadFromConf(conf)), + reduceWork.isAutoReduceParallelism() ? + reduceWork.getMaxReduceTasks() : + reduceWork.getNumReduceTasks(), getContainerResource(conf)); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); + reducer.setExecutionContext(vertexExecutionContext); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 34e8cc8..ac460b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -24,10 +24,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,22 +44,23 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.mapreduce.hadoop.MRHelpers; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; /** * Holds session state related to Tez @@ -71,14 +70,9 @@ public class TezSessionState { private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; public static final String LLAP_SERVICE = "LLAP"; - public static final String DEFAULT_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; - public static final String LOCAL_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT; private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService"; private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher"; private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator"; - private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + LLAP_SCHEDULER; - private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + LLAP_LAUNCHER; - private static final String LLAP_SERVICE_TASK_COMMUNICATOR = LLAP_SERVICE + ":" + LLAP_TASK_COMMUNICATOR; private HiveConf conf; private Path tezScratchDir; @@ -212,25 +206,23 @@ public class TezSessionState { // set up the staging directory to use tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + ServicePluginsDescriptor servicePluginsDescriptor; + UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); + if (llapMode) { // we need plugins to handle llap and uber mode - tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE, - LLAP_SERVICE_SCHEDULER); - - tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE, - LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER); - - tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE, - LOCAL_SERVICE, LLAP_SERVICE_TASK_COMMUNICATOR); + servicePluginsDescriptor = ServicePluginsDescriptor.create(true, + new TaskSchedulerDescriptor[]{ + TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER) + .setUserPayload(servicePluginPayload)}, + new ContainerLauncherDescriptor[]{ + ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)}, + new TaskCommunicatorDescriptor[]{ + TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR) + .setUserPayload(servicePluginPayload)}); } else { // we need plugins to handle llap and uber mode - tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE); - - tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE, - LOCAL_SERVICE); - - tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE, - LOCAL_SERVICE); + servicePluginsDescriptor = ServicePluginsDescriptor.create(true); } // container prewarming. tell the am how many containers we need @@ -242,8 +234,9 @@ public class TezSessionState { tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); } - session = TezClient.create("HIVE-" + sessionId, tezConfig, true, - commonLocalResources, null); + session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig).setIsSession(true) + .setLocalResources(commonLocalResources) + .setServicePluginDescriptor(servicePluginsDescriptor).build(); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")");