HIVE-10280. LLAP: Handle errors while sending source state updates to the 
daemons. (Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d94e8d08
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d94e8d08
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d94e8d08

Branch: refs/heads/llap
Commit: d94e8d08dd1d92c9eee99f60273e895a4a633b23
Parents: 21f18ad
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Apr 2 15:06:34 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Apr 2 15:06:34 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/tez/LlapProtocolClientProxy.java  |   4 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |  37 ++-
 .../tezplugins/LlapTaskSchedulerService.java    |   7 +
 .../tezplugins/helpers/SourceStateTracker.java  |   2 +-
 .../tezplugins/TestLlapTaskCommunicator.java    | 304 ++++++++++++++++++-
 5 files changed, 340 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index e8d4148..f48a1cb 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -139,10 +139,8 @@ public class LlapProtocolClientProxy extends 
AbstractService {
     requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, 
callback));
   }
 
-  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host,
-                                    final int port,
+  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final LlapNodeId nodeId,
                                     final 
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
     requestManager.queueRequest(
         new SendSourceStateUpdateCallable(nodeId, request, callback));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 456121b..799367b 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -150,7 +150,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     super.initialize();
     Configuration conf = getConf();
     int numThreads = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
-    this.communicator = new LlapProtocolClientProxy(numThreads, conf, token);
+    this.communicator = createLlapProtocolClientProxy(numThreads, conf);
     this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
     LOG.info("Running LlapTaskCommunicator with "
@@ -205,6 +205,10 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
   }
 
+  protected LlapProtocolClientProxy createLlapProtocolClientProxy(int 
numThreads, Configuration conf) {
+    return new LlapProtocolClientProxy(numThreads, conf, token);
+  }
+
   @Override
   public void registerRunningContainer(ContainerId containerId, String 
hostname, int port) {
     super.registerRunningContainer(containerId, hostname, port);
@@ -413,9 +417,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
         .sourceStateUpdated(vertexStateUpdate.getVertexName(), 
vertexStateUpdate.getVertexState());
   }
 
-  public void sendStateUpdate(final String host, final int port,
+  public void sendStateUpdate(final LlapNodeId nodeId,
                               final SourceStateUpdatedRequestProto request) {
-    communicator.sendSourceStateUpdate(request, host, port,
+    communicator.sendSourceStateUpdate(request, nodeId,
         new 
LlapProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>()
 {
           @Override
           public void setResponse(SourceStateUpdatedResponseProto response) {
@@ -423,12 +427,29 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
           @Override
           public void indicateError(Throwable t) {
-            // TODO HIVE-10280.
-            // Ideally, this should be retried for a while, after which the 
node should be marked as failed.
-            // Considering tasks are supposed to run fast. Failing the task 
immediately may be a good option.
+
+            // Re-attempts are left upto the RPC layer. If there's a failure 
reported after this,
+            // mark all attempts running on this node as KILLED. The node 
itself cannot be killed from
+            // here, that's only possible via the scheduler.
+            // The assumption is that if there's a failure to communicate with 
the node - it will
+            // eventually timeout - and no more tasks will be allocated on it.
+
             LOG.error(
-                "Failed to send state update to node: " + host + ":" + port + 
", StateUpdate=" +
-                    request, t);
+                "Failed to send state update to node: {}, Killing all attempts 
running on node. Attempted StateUpdate={}",
+                nodeId, request, t);
+            BiMap<ContainerId, TezTaskAttemptID> biMap =
+                entityTracker.getContainerAttemptMapForNode(nodeId);
+            if (biMap != null) {
+              synchronized (biMap) {
+                for (Map.Entry<ContainerId, TezTaskAttemptID> entry : 
biMap.entrySet()) {
+                  LOG.info(
+                      "Sending a kill for attempt {}, due to a communication 
failure while sending a finishable state update",
+                      entry.getValue());
+                  getContext().taskKilled(entry.getValue(), 
TaskAttemptEndReason.NODE_FAILED,
+                      "Failed to send finishable state update to node " + 
nodeId);
+                }
+              }
+            }
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 0cb770b..b57ae1a 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -516,6 +516,13 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               
dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
             }
           }
+          if (endReason != null && endReason == 
TaskAttemptEndReason.NODE_FAILED) {
+            LOG.info(
+                "Task {} ended on {} nodeInfo.toString() with a NODE_FAILED 
message." +
+                    " An message should come in from the registry to disable 
this node unless" +
+                    " this was a temporary communication failure",
+                task, assignedInstance);
+          }
           boolean commFailure =
               endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
           disableInstance(assignedInstance, commFailure);

http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index d8f7574..3dd73f6 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -280,7 +280,7 @@ public class SourceStateTracker {
 
 
   void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState 
state) {
-    taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
+    taskCommunicator.sendStateUpdate(nodeId,
         
SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier)
             
.setSrcName(sourceName).setState(Converters.fromVertexState(state)).build());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d94e8d08/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 8f3d104..1ee6a50 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -17,13 +17,49 @@ package org.apache.hadoop.hive.llap.tezplugins;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestLlapTaskCommunicator {
 
@@ -32,8 +68,6 @@ public class TestLlapTaskCommunicator {
     LlapTaskCommunicator.EntityTracker entityTracker = new 
LlapTaskCommunicator.EntityTracker();
 
     String host1 = "host1";
-    String host2 = "host2";
-    String host3 = "host3";
     int port = 1451;
 
 
@@ -84,6 +118,240 @@ public class TestLlapTaskCommunicator {
   }
 
 
+  @Test(timeout = 5000)
+  public void testFinishableStateUpdateFailure() throws Exception {
+
+    LlapTaskCommunicatorWrapperForTest wrapper = null;
+
+    Lock lock = new ReentrantLock();
+    Condition condition = lock.newCondition();
+    final AtomicBoolean opDone = new AtomicBoolean(false);
+
+    LlapProtocolClientProxy proxy = mock(LlapProtocolClientProxy.class,
+        new FinishableStatusUpdateTestAnswer(lock, condition, opDone));
+
+    try {
+      wrapper = new LlapTaskCommunicatorWrapperForTest(proxy);
+
+      // Register tasks on 2 nodes, with a dependency on vertex1 completing.
+      ContainerId cId11 = wrapper.registerContainer(1, 0);
+      TaskSpec ts11 = 
wrapper.registerRunningTaskAttemptWithSourceVertex(cId11, 1);
+
+      ContainerId cId12 = wrapper.registerContainer(2, 0);
+      TaskSpec ts12 = 
wrapper.registerRunningTaskAttemptWithSourceVertex(cId12, 2);
+
+      ContainerId cId21 = wrapper.registerContainer(3, 1);
+      TaskSpec ts21 = 
wrapper.registerRunningTaskAttemptWithSourceVertex(cId21, 3);
+
+      // Send a state update for vertex1 completion. This triggers a status 
update to be sent out.
+      VertexStateUpdate vertexStateUpdate =
+          new 
VertexStateUpdate(LlapTaskCommunicatorWrapperForTest.VERTEX_NAME1,
+              VertexState.SUCCEEDED);
+      wrapper.getTaskCommunicator().onVertexStateUpdated(vertexStateUpdate);
+
+      // Wait for all invocations to complete.
+      lock.lock();
+      try {
+        while (!opDone.get()) {
+          condition.await();
+        }
+      } finally {
+        lock.unlock();
+      }
+      // Verify that a task kill went out for all nodes running on the 
specified host.
+
+      verify(wrapper.getTaskCommunicatorContext(), times(2))
+          .taskKilled(any(TezTaskAttemptID.class), 
any(TaskAttemptEndReason.class),
+              any(String.class));
+
+      
verify(wrapper.getTaskCommunicatorContext()).taskKilled(eq(ts11.getTaskAttemptID()),
+          eq(TaskAttemptEndReason.NODE_FAILED), any(String.class));
+      
verify(wrapper.getTaskCommunicatorContext()).taskKilled(eq(ts12.getTaskAttemptID()),
+          eq(TaskAttemptEndReason.NODE_FAILED), any(String.class));
+
+      wrapper.getTaskCommunicator().sendStateUpdate(LlapNodeId
+              .getInstance(LlapTaskCommunicatorWrapperForTest.HOSTS[1],
+                  LlapTaskCommunicatorWrapperForTest.RPC_PORT),
+          
LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance());
+
+      // Verify no more invocations in case of success.
+      verify(wrapper.getTaskCommunicatorContext(), times(2))
+          .taskKilled(any(TezTaskAttemptID.class), 
any(TaskAttemptEndReason.class),
+              any(String.class));
+
+    } finally {
+      if (wrapper != null) {
+        wrapper.shutdown();
+      }
+    }
+  }
+
+  static class FinishableStatusUpdateTestAnswer implements Answer<Void> {
+
+    final Lock lock;
+    final Condition condition;
+    final AtomicBoolean opDone;
+
+    final AtomicBoolean successInvoked = new AtomicBoolean(false);
+    final AtomicBoolean failInvoked = new AtomicBoolean(false);
+
+
+    FinishableStatusUpdateTestAnswer(Lock lock, Condition condition, 
AtomicBoolean opDone) {
+      this.lock = lock;
+      this.condition = condition;
+      this.opDone = opDone;
+    }
+
+    void reset() {
+      opDone.set(false);
+      successInvoked.set(false);
+      failInvoked.set(false);
+    }
+
+    @Override
+    public Void answer(InvocationOnMock invocation) throws Throwable {
+      if (invocation.getMethod().getName().equals("sendSourceStateUpdate")) {
+        LlapNodeId nodeId = (LlapNodeId) invocation.getArguments()[1];
+        final LlapProtocolClientProxy.ExecuteRequestCallback callback =
+            (LlapProtocolClientProxy.ExecuteRequestCallback) 
invocation.getArguments()[2];
+
+        if 
(nodeId.getHostname().equals(LlapTaskCommunicatorWrapperForTest.HOSTS[0])) {
+          new Thread() {
+            public void run() {
+              callback.indicateError(
+                  new IOException("Force failing " + 
LlapTaskCommunicatorWrapperForTest.HOSTS[0]));
+              successInvoked.set(true);
+              signalOpDoneIfBothInvoked();
+            }
+          }.start();
+        } else {
+          new Thread() {
+            public void run() {
+              // Report success for all other cases.
+              callback.setResponse(
+                  
LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance());
+              failInvoked.set(true);
+              signalOpDoneIfBothInvoked();
+            }
+          }.start();
+        }
+      }
+      return null;
+    }
+
+    private void signalOpDoneIfBothInvoked() {
+      lock.lock();
+      try {
+        if (failInvoked.get() && successInvoked.get()) {
+          opDone.set(true);
+          condition.signal();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+
+  /**
+   * Wrapper class which is responsible for setting up various mocks required 
for different tests.
+   */
+  private static class LlapTaskCommunicatorWrapperForTest {
+
+    static final String[] HOSTS = new String[]{"host1", "host2", "host3"};
+    static final int RPC_PORT = 15002;
+    static final String DAG_NAME = "dagName";
+    static final String VERTEX_NAME1 = "vertexName1";
+    static final String VERTEX_NAME2 = "vertexName2";
+
+    final TaskCommunicatorContext taskCommunicatorContext = 
mock(TaskCommunicatorContext.class);
+
+    final ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    final ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 100);
+    final TezDAGID dagid = TezDAGID.getInstance(appId, 200);
+    final TezVertexID vertexId1 = TezVertexID.getInstance(dagid, 300);
+    final TezVertexID vertexId2 = TezVertexID.getInstance(dagid, 301);
+    final Configuration conf = new Configuration(false);
+    final UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+    final LlapTaskCommunicatorForTest taskCommunicator;
+
+    public LlapTaskCommunicatorWrapperForTest(LlapProtocolClientProxy 
llapProxy) throws Exception {
+      
doReturn(appAttemptId).when(taskCommunicatorContext).getApplicationAttemptId();
+      doReturn(new 
Credentials()).when(taskCommunicatorContext).getCredentials();
+      
doReturn(userPayload).when(taskCommunicatorContext).getInitialUserPayload();
+      
doReturn(appId.toString()).when(taskCommunicatorContext).getCurrentAppIdentifier();
+      doReturn(new LinkedList<String>()).when(taskCommunicatorContext)
+          .getInputVertexNames(any(String.class));
+
+
+      this.taskCommunicator = new 
LlapTaskCommunicatorForTest(taskCommunicatorContext, llapProxy);
+      this.taskCommunicator.initialize();
+      this.taskCommunicator.start();
+    }
+
+    void shutdown() {
+      this.taskCommunicator.shutdown();
+    }
+
+    TaskCommunicatorContext getTaskCommunicatorContext() {
+      return taskCommunicatorContext;
+    }
+
+    LlapTaskCommunicatorForTest getTaskCommunicator() {
+      return taskCommunicator;
+    }
+
+    ContainerId registerContainer(int containerIdx, int hostIdx) {
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, 
containerIdx);
+      taskCommunicator.registerRunningContainer(containerId, HOSTS[hostIdx], 
RPC_PORT);
+      return containerId;
+    }
+
+    /*
+    Sets up a TaskSpec which has vertex1 as it's input, and tasks belonging to 
vertex2
+     */
+    TaskSpec registerRunningTaskAttemptWithSourceVertex(ContainerId 
containerId, int taskIdx) {
+      TaskSpec taskSpec = createBaseTaskSpec(VERTEX_NAME2, vertexId2, taskIdx);
+
+      InputSpec inputSpec =
+          new InputSpec(VERTEX_NAME1, 
InputDescriptor.create("fakeInputClassName"), 3);
+      List<InputSpec> inputs = Lists.newArrayList(inputSpec);
+
+      doReturn(inputs).when(taskSpec).getInputs();
+
+      taskCommunicator
+          .registerRunningTaskAttempt(containerId, taskSpec, new 
HashMap<String, LocalResource>(),
+              new Credentials(), false, 2);
+      return taskSpec;
+    }
+
+    /*
+    Sets up a TaskSpec with no inputs, and tasks belonging to vertex1
+     */
+    TaskSpec registerRunningTaskAttempt(ContainerId containerId, int taskIdx) {
+
+      TaskSpec taskSpec = createBaseTaskSpec(VERTEX_NAME1, vertexId1, taskIdx);
+
+      taskCommunicator
+          .registerRunningTaskAttempt(containerId, taskSpec, new 
HashMap<String, LocalResource>(),
+              new Credentials(), false, 2);
+      return taskSpec;
+    }
+
+    private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID 
vertexId, int taskIdx) {
+      TaskSpec taskSpec = mock(TaskSpec.class);
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
+          TezTaskID.getInstance(vertexId, taskIdx), 0);
+      doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
+      doReturn(DAG_NAME).when(taskSpec).getDAGName();
+      doReturn(vertexName).when(taskSpec).getVertexName();
+      return taskSpec;
+    }
+  }
+
+
+
   private ContainerId constructContainerId(int id) {
     ContainerId containerId = mock(ContainerId.class);
     doReturn(id).when(containerId).getId();
@@ -97,4 +365,36 @@ public class TestLlapTaskCommunicator {
     return taskAttemptId;
   }
 
+
+  private static class LlapTaskCommunicatorForTest extends 
LlapTaskCommunicator {
+
+    private final LlapProtocolClientProxy llapProxy;
+
+    public LlapTaskCommunicatorForTest(
+        TaskCommunicatorContext taskCommunicatorContext) {
+      this(taskCommunicatorContext, mock(LlapProtocolClientProxy.class));
+    }
+
+    public LlapTaskCommunicatorForTest(
+        TaskCommunicatorContext taskCommunicatorContext, 
LlapProtocolClientProxy llapProxy) {
+      super(taskCommunicatorContext);
+      this.llapProxy = llapProxy;
+    }
+
+    @Override
+    protected void startRpcServer() {
+    }
+
+    @Override
+    protected LlapProtocolClientProxy createLlapProtocolClientProxy(int 
numThreads,
+                                                                    
Configuration conf) {
+      return llapProxy;
+    }
+
+    @Override
+    public InetSocketAddress getAddress() {
+      return InetSocketAddress.createUnresolved("localhost", 15001);
+    }
+  }
+
 }

Reply via email to