Repository: hive
Updated Branches:
  refs/heads/llap 785ec8e75 -> b321c55a3


HIVE-10961. LLAP: Fix ShuffleHandler + Submit work init race condition. 
(Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b321c55a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b321c55a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b321c55a

Branch: refs/heads/llap
Commit: b321c55a326093ea554df77e806e2601eab24832
Parents: 785ec8e
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jun 9 13:58:49 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jun 9 13:58:49 2015 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       | 18 +++++++++++------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 18 +++++------------
 .../hive/llap/daemon/impl/LlapDaemon.java       | 21 ++++++++++++++++----
 3 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 7376792..1ba18fc 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -77,11 +78,12 @@ public class AMReporter extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
 
-  private final LlapNodeId nodeId;
+  private volatile LlapNodeId nodeId;
   private final Configuration conf;
   private final ListeningExecutorService queueLookupExecutor;
   private final ListeningExecutorService executor;
   private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new 
DelayQueue();
+  private final AtomicReference<InetSocketAddress> localAddress;
   private final long heartbeatInterval;
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   // Tracks appMasters to which heartbeats are being sent. This should not be 
used for any other
@@ -89,9 +91,9 @@ public class AMReporter extends AbstractService {
   private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
 
-  public AMReporter(LlapNodeId nodeId, Configuration conf) {
+  public AMReporter(AtomicReference<InetSocketAddress> localAddress, 
Configuration conf) {
     super(AMReporter.class.getName());
-    this.nodeId = nodeId;
+    this.localAddress = localAddress;
     this.conf = conf;
     ExecutorService rawExecutor = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter 
%d").build());
@@ -102,7 +104,7 @@ public class AMReporter extends AbstractService {
     this.heartbeatInterval =
         
conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS,
             
LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
-    LOG.info("AMReporter running with NodeId: {}", nodeId);
+
   }
 
   @Override
@@ -125,7 +127,8 @@ public class AMReporter extends AbstractService {
         }
       }
     });
-    LOG.info("Started service: " + getName());
+    nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), 
localAddress.get().getPort());
+    LOG.info("AMReporter running with NodeId: {}", nodeId);
   }
 
   @Override
@@ -170,7 +173,7 @@ public class AMReporter extends AbstractService {
     synchronized (knownAppMasters) {
       amNodeInfo = knownAppMasters.get(amNodeId);
       if (amNodeInfo == null) {
-        LOG.info(("Ignoring duplocate unregisterRequest for am at: " + 
amLocation + ":" + port));
+        LOG.info(("Ignoring duplicate unregisterRequest for am at: " + 
amLocation + ":" + port));
       }
       amNodeInfo.decrementAndGetTaskCount();
       // Not removing this here. Will be removed when taken off the queue and 
discovered to have 0
@@ -184,6 +187,9 @@ public class AMReporter extends AbstractService {
     // knownAppMasters is used for sending heartbeats for queued tasks. Killed 
messages use a new connection.
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+
+    // Even if the service hasn't started up. It's OK to make this invocation 
since this will
+    // only happen after the AtomicReference address has been populated. Not 
adding an additional check.
     ListenableFuture<Void> future =
         executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo));
     Futures.addCallback(future, new FutureCallback<Void>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6f9f429..10e192e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -25,7 +25,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -67,7 +66,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerRunnerImpl.class);
   public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
 
-  private volatile AMReporter amReporter;
+  private final AMReporter amReporter;
   private final QueryTracker queryTracker;
   private final Scheduler<TaskRunnerCallable> executorService;
   private final AtomicReference<InetSocketAddress> localAddress;
@@ -81,12 +80,14 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int 
waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, int localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
-      long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) {
+      long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
+      AMReporter amReporter) {
     super("ContainerRunnerImpl");
     this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
     this.localAddress = localAddress;
+    this.amReporter = amReporter;
 
     this.queryTracker = new QueryTracker(conf, localDirsBase);
     addIfService(queryTracker);
@@ -122,22 +123,13 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 
   @Override
   public void serviceStart() throws Exception {
-    // The node id will only be available at this point, since the server has 
been started in LlapDaemon
     super.serviceStart();
-    LlapNodeId llapNodeId = 
LlapNodeId.getInstance(localAddress.get().getHostName(),
-        localAddress.get().getPort());
-    this.amReporter = new AMReporter(llapNodeId, conf);
-    amReporter.init(conf);
-    amReporter.start();
+
   }
 
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
-    if (amReporter != null) {
-      amReporter.stop();
-      amReporter = null;
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 75377d4..1801212 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -63,6 +63,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
   private final Configuration shuffleHandlerConf;
   private final LlapDaemonProtocolServerImpl server;
   private final ContainerRunnerImpl containerRunner;
+  private final AMReporter amReporter;
   private final LlapRegistryService registry;
   private final LlapWebServices webServices;
   private final AtomicLong numSubmissions = new AtomicLong(0);
@@ -155,8 +156,12 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
         " sessionId: " + sessionId);
 
+
+    this.amReporter = new AMReporter(address, daemonConf);
+
+
     this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, 
rpcPort);
-    addIfService(server);
+
 
     this.containerRunner = new ContainerRunnerImpl(daemonConf,
         numExecutors,
@@ -166,13 +171,19 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
         shufflePort,
         address,
         executorMemoryBytes,
-        metrics);
+        metrics,
+        amReporter);
     addIfService(containerRunner);
 
     this.registry = new LlapRegistryService(true);
     addIfService(registry);
     this.webServices = new LlapWebServices();
     addIfService(webServices);
+    // Bring up the server only after all other components have started.
+    addIfService(server);
+    // AMReporter after the server so that it gets the correct address. It 
knows how to deal with
+    // requests before it is started.
+    addIfService(amReporter);
   }
 
   private long getTotalHeapSize() {
@@ -220,14 +231,16 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
 
   @Override
   public void serviceStart() throws Exception {
-    super.serviceStart();
+    // Start the Shuffle service before the listener - until it's a service as 
well.
     ShuffleHandler.initializeAndStart(shuffleHandlerConf);
+    super.serviceStart();
+    LOG.info("LlapDaemon serviceStart complete");
   }
 
   public void serviceStop() throws Exception {
     super.serviceStop();
-    shutdown();
     ShuffleHandler.shutdown();
+    shutdown();
     LOG.info("LlapDaemon shutdown complete");
   }
 

Reply via email to