HIVE-10280. LLAP: Handle errors while sending source state updates to the daemons. (Siddharth Seth, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d94e8d08 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d94e8d08 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d94e8d08 Branch: refs/heads/llap Commit: d94e8d08dd1d92c9eee99f60273e895a4a633b23 Parents: 21f18ad Author: Siddharth Seth <ss...@apache.org> Authored: Sat Apr 2 15:06:34 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Sat Apr 2 15:06:34 2016 -0700 ---------------------------------------------------------------------- .../hive/llap/tez/LlapProtocolClientProxy.java | 4 +- .../llap/tezplugins/LlapTaskCommunicator.java | 37 ++- .../tezplugins/LlapTaskSchedulerService.java | 7 + .../tezplugins/helpers/SourceStateTracker.java | 2 +- .../tezplugins/TestLlapTaskCommunicator.java | 304 ++++++++++++++++++- 5 files changed, 340 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java index e8d4148..f48a1cb 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java @@ -139,10 +139,8 @@ public class LlapProtocolClientProxy extends AbstractService { requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback)); } - public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, - final int port, + public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final LlapNodeId nodeId, final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); requestManager.queueRequest( new SendSourceStateUpdateCallable(nodeId, request, callback)); } http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 456121b..799367b 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -150,7 +150,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { super.initialize(); Configuration conf = getConf(); int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS); - this.communicator = new LlapProtocolClientProxy(numThreads, conf, token); + this.communicator = createLlapProtocolClientProxy(numThreads, conf); this.deleteDelayOnDagComplete = HiveConf.getTimeVar( conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); LOG.info("Running LlapTaskCommunicator with " @@ -205,6 +205,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } + protected LlapProtocolClientProxy createLlapProtocolClientProxy(int numThreads, Configuration conf) { + return new LlapProtocolClientProxy(numThreads, conf, token); + } + @Override public void registerRunningContainer(ContainerId containerId, String hostname, int port) { super.registerRunningContainer(containerId, hostname, port); @@ -413,9 +417,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { .sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState()); } - public void sendStateUpdate(final String host, final int port, + public void sendStateUpdate(final LlapNodeId nodeId, final SourceStateUpdatedRequestProto request) { - communicator.sendSourceStateUpdate(request, host, port, + communicator.sendSourceStateUpdate(request, nodeId, new LlapProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() { @Override public void setResponse(SourceStateUpdatedResponseProto response) { @@ -423,12 +427,29 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void indicateError(Throwable t) { - // TODO HIVE-10280. - // Ideally, this should be retried for a while, after which the node should be marked as failed. - // Considering tasks are supposed to run fast. Failing the task immediately may be a good option. + + // Re-attempts are left upto the RPC layer. If there's a failure reported after this, + // mark all attempts running on this node as KILLED. The node itself cannot be killed from + // here, that's only possible via the scheduler. + // The assumption is that if there's a failure to communicate with the node - it will + // eventually timeout - and no more tasks will be allocated on it. + LOG.error( - "Failed to send state update to node: " + host + ":" + port + ", StateUpdate=" + - request, t); + "Failed to send state update to node: {}, Killing all attempts running on node. Attempted StateUpdate={}", + nodeId, request, t); + BiMap<ContainerId, TezTaskAttemptID> biMap = + entityTracker.getContainerAttemptMapForNode(nodeId); + if (biMap != null) { + synchronized (biMap) { + for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { + LOG.info( + "Sending a kill for attempt {}, due to a communication failure while sending a finishable state update", + entry.getValue()); + getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED, + "Failed to send finishable state update to node " + nodeId); + } + } + } } }); } http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 0cb770b..b57ae1a 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -516,6 +516,13 @@ public class LlapTaskSchedulerService extends TaskScheduler { dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); } } + if (endReason != null && endReason == TaskAttemptEndReason.NODE_FAILED) { + LOG.info( + "Task {} ended on {} nodeInfo.toString() with a NODE_FAILED message." + + " An message should come in from the registry to disable this node unless" + + " this was a temporary communication failure", + task, assignedInstance); + } boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; disableInstance(assignedInstance, commFailure); http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index d8f7574..3dd73f6 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -280,7 +280,7 @@ public class SourceStateTracker { void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { - taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), + taskCommunicator.sendStateUpdate(nodeId, SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier) .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build()); } http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java index 8f3d104..1ee6a50 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -17,13 +17,49 @@ package org.apache.hadoop.hive.llap.tezplugins; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestLlapTaskCommunicator { @@ -32,8 +68,6 @@ public class TestLlapTaskCommunicator { LlapTaskCommunicator.EntityTracker entityTracker = new LlapTaskCommunicator.EntityTracker(); String host1 = "host1"; - String host2 = "host2"; - String host3 = "host3"; int port = 1451; @@ -84,6 +118,240 @@ public class TestLlapTaskCommunicator { } + @Test(timeout = 5000) + public void testFinishableStateUpdateFailure() throws Exception { + + LlapTaskCommunicatorWrapperForTest wrapper = null; + + Lock lock = new ReentrantLock(); + Condition condition = lock.newCondition(); + final AtomicBoolean opDone = new AtomicBoolean(false); + + LlapProtocolClientProxy proxy = mock(LlapProtocolClientProxy.class, + new FinishableStatusUpdateTestAnswer(lock, condition, opDone)); + + try { + wrapper = new LlapTaskCommunicatorWrapperForTest(proxy); + + // Register tasks on 2 nodes, with a dependency on vertex1 completing. + ContainerId cId11 = wrapper.registerContainer(1, 0); + TaskSpec ts11 = wrapper.registerRunningTaskAttemptWithSourceVertex(cId11, 1); + + ContainerId cId12 = wrapper.registerContainer(2, 0); + TaskSpec ts12 = wrapper.registerRunningTaskAttemptWithSourceVertex(cId12, 2); + + ContainerId cId21 = wrapper.registerContainer(3, 1); + TaskSpec ts21 = wrapper.registerRunningTaskAttemptWithSourceVertex(cId21, 3); + + // Send a state update for vertex1 completion. This triggers a status update to be sent out. + VertexStateUpdate vertexStateUpdate = + new VertexStateUpdate(LlapTaskCommunicatorWrapperForTest.VERTEX_NAME1, + VertexState.SUCCEEDED); + wrapper.getTaskCommunicator().onVertexStateUpdated(vertexStateUpdate); + + // Wait for all invocations to complete. + lock.lock(); + try { + while (!opDone.get()) { + condition.await(); + } + } finally { + lock.unlock(); + } + // Verify that a task kill went out for all nodes running on the specified host. + + verify(wrapper.getTaskCommunicatorContext(), times(2)) + .taskKilled(any(TezTaskAttemptID.class), any(TaskAttemptEndReason.class), + any(String.class)); + + verify(wrapper.getTaskCommunicatorContext()).taskKilled(eq(ts11.getTaskAttemptID()), + eq(TaskAttemptEndReason.NODE_FAILED), any(String.class)); + verify(wrapper.getTaskCommunicatorContext()).taskKilled(eq(ts12.getTaskAttemptID()), + eq(TaskAttemptEndReason.NODE_FAILED), any(String.class)); + + wrapper.getTaskCommunicator().sendStateUpdate(LlapNodeId + .getInstance(LlapTaskCommunicatorWrapperForTest.HOSTS[1], + LlapTaskCommunicatorWrapperForTest.RPC_PORT), + LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance()); + + // Verify no more invocations in case of success. + verify(wrapper.getTaskCommunicatorContext(), times(2)) + .taskKilled(any(TezTaskAttemptID.class), any(TaskAttemptEndReason.class), + any(String.class)); + + } finally { + if (wrapper != null) { + wrapper.shutdown(); + } + } + } + + static class FinishableStatusUpdateTestAnswer implements Answer<Void> { + + final Lock lock; + final Condition condition; + final AtomicBoolean opDone; + + final AtomicBoolean successInvoked = new AtomicBoolean(false); + final AtomicBoolean failInvoked = new AtomicBoolean(false); + + + FinishableStatusUpdateTestAnswer(Lock lock, Condition condition, AtomicBoolean opDone) { + this.lock = lock; + this.condition = condition; + this.opDone = opDone; + } + + void reset() { + opDone.set(false); + successInvoked.set(false); + failInvoked.set(false); + } + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (invocation.getMethod().getName().equals("sendSourceStateUpdate")) { + LlapNodeId nodeId = (LlapNodeId) invocation.getArguments()[1]; + final LlapProtocolClientProxy.ExecuteRequestCallback callback = + (LlapProtocolClientProxy.ExecuteRequestCallback) invocation.getArguments()[2]; + + if (nodeId.getHostname().equals(LlapTaskCommunicatorWrapperForTest.HOSTS[0])) { + new Thread() { + public void run() { + callback.indicateError( + new IOException("Force failing " + LlapTaskCommunicatorWrapperForTest.HOSTS[0])); + successInvoked.set(true); + signalOpDoneIfBothInvoked(); + } + }.start(); + } else { + new Thread() { + public void run() { + // Report success for all other cases. + callback.setResponse( + LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance()); + failInvoked.set(true); + signalOpDoneIfBothInvoked(); + } + }.start(); + } + } + return null; + } + + private void signalOpDoneIfBothInvoked() { + lock.lock(); + try { + if (failInvoked.get() && successInvoked.get()) { + opDone.set(true); + condition.signal(); + } + } finally { + lock.unlock(); + } + } + } + + + /** + * Wrapper class which is responsible for setting up various mocks required for different tests. + */ + private static class LlapTaskCommunicatorWrapperForTest { + + static final String[] HOSTS = new String[]{"host1", "host2", "host3"}; + static final int RPC_PORT = 15002; + static final String DAG_NAME = "dagName"; + static final String VERTEX_NAME1 = "vertexName1"; + static final String VERTEX_NAME2 = "vertexName2"; + + final TaskCommunicatorContext taskCommunicatorContext = mock(TaskCommunicatorContext.class); + + final ApplicationId appId = ApplicationId.newInstance(1000, 1); + final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 100); + final TezDAGID dagid = TezDAGID.getInstance(appId, 200); + final TezVertexID vertexId1 = TezVertexID.getInstance(dagid, 300); + final TezVertexID vertexId2 = TezVertexID.getInstance(dagid, 301); + final Configuration conf = new Configuration(false); + final UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + + final LlapTaskCommunicatorForTest taskCommunicator; + + public LlapTaskCommunicatorWrapperForTest(LlapProtocolClientProxy llapProxy) throws Exception { + doReturn(appAttemptId).when(taskCommunicatorContext).getApplicationAttemptId(); + doReturn(new Credentials()).when(taskCommunicatorContext).getCredentials(); + doReturn(userPayload).when(taskCommunicatorContext).getInitialUserPayload(); + doReturn(appId.toString()).when(taskCommunicatorContext).getCurrentAppIdentifier(); + doReturn(new LinkedList<String>()).when(taskCommunicatorContext) + .getInputVertexNames(any(String.class)); + + + this.taskCommunicator = new LlapTaskCommunicatorForTest(taskCommunicatorContext, llapProxy); + this.taskCommunicator.initialize(); + this.taskCommunicator.start(); + } + + void shutdown() { + this.taskCommunicator.shutdown(); + } + + TaskCommunicatorContext getTaskCommunicatorContext() { + return taskCommunicatorContext; + } + + LlapTaskCommunicatorForTest getTaskCommunicator() { + return taskCommunicator; + } + + ContainerId registerContainer(int containerIdx, int hostIdx) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); + taskCommunicator.registerRunningContainer(containerId, HOSTS[hostIdx], RPC_PORT); + return containerId; + } + + /* + Sets up a TaskSpec which has vertex1 as it's input, and tasks belonging to vertex2 + */ + TaskSpec registerRunningTaskAttemptWithSourceVertex(ContainerId containerId, int taskIdx) { + TaskSpec taskSpec = createBaseTaskSpec(VERTEX_NAME2, vertexId2, taskIdx); + + InputSpec inputSpec = + new InputSpec(VERTEX_NAME1, InputDescriptor.create("fakeInputClassName"), 3); + List<InputSpec> inputs = Lists.newArrayList(inputSpec); + + doReturn(inputs).when(taskSpec).getInputs(); + + taskCommunicator + .registerRunningTaskAttempt(containerId, taskSpec, new HashMap<String, LocalResource>(), + new Credentials(), false, 2); + return taskSpec; + } + + /* + Sets up a TaskSpec with no inputs, and tasks belonging to vertex1 + */ + TaskSpec registerRunningTaskAttempt(ContainerId containerId, int taskIdx) { + + TaskSpec taskSpec = createBaseTaskSpec(VERTEX_NAME1, vertexId1, taskIdx); + + taskCommunicator + .registerRunningTaskAttempt(containerId, taskSpec, new HashMap<String, LocalResource>(), + new Credentials(), false, 2); + return taskSpec; + } + + private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int taskIdx) { + TaskSpec taskSpec = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId, taskIdx), 0); + doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); + doReturn(DAG_NAME).when(taskSpec).getDAGName(); + doReturn(vertexName).when(taskSpec).getVertexName(); + return taskSpec; + } + } + + + private ContainerId constructContainerId(int id) { ContainerId containerId = mock(ContainerId.class); doReturn(id).when(containerId).getId(); @@ -97,4 +365,36 @@ public class TestLlapTaskCommunicator { return taskAttemptId; } + + private static class LlapTaskCommunicatorForTest extends LlapTaskCommunicator { + + private final LlapProtocolClientProxy llapProxy; + + public LlapTaskCommunicatorForTest( + TaskCommunicatorContext taskCommunicatorContext) { + this(taskCommunicatorContext, mock(LlapProtocolClientProxy.class)); + } + + public LlapTaskCommunicatorForTest( + TaskCommunicatorContext taskCommunicatorContext, LlapProtocolClientProxy llapProxy) { + super(taskCommunicatorContext); + this.llapProxy = llapProxy; + } + + @Override + protected void startRpcServer() { + } + + @Override + protected LlapProtocolClientProxy createLlapProtocolClientProxy(int numThreads, + Configuration conf) { + return llapProxy; + } + + @Override + public InetSocketAddress getAddress() { + return InetSocketAddress.createUnresolved("localhost", 15001); + } + } + }