Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1594

Change subject: Shutdown ActiveManager Before Killing Thread Executors
......................................................................

Shutdown ActiveManager Before Killing Thread Executors

Avoid race conditions with start / stop of active runtimes by stopping
the ActiveManager & and started runtimes before terminating the thread
executor

Change-Id: I45e83b0378198f80297fd2741969507741914dea
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
14 files changed, 94 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/94/1594/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b15cfca..f15270d 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,11 +18,20 @@
  */
 package org.apache.asterix.active;
 
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.common.api.ThreadExecutor;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Logger;
@@ -30,12 +39,14 @@
 public class ActiveManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveManager.class.getName());
-    private final Executor executor;
-    private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
+    public static final int SHUTDOWN_TIMEOUT_SECS = 60;
+    private final ThreadExecutor executor;
+    private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
+    private volatile boolean shutdown;
 
-    public ActiveManager(Executor executor, String nodeId, long 
activeMemoryBudget, int frameSize)
+    public ActiveManager(ThreadExecutor executor, String nodeId, long 
activeMemoryBudget, int frameSize)
             throws HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
@@ -47,11 +58,12 @@
         return activeFramePool;
     }
 
-    public void registerRuntime(IActiveRuntime runtime) {
-        ActiveRuntimeId id = runtime.getRuntimeId();
-        if (!runtimes.containsKey(id)) {
-            runtimes.put(id, runtime);
+    public void registerRuntime(IActiveRuntime runtime) throws 
HyracksDataException {
+        if (shutdown) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN);
         }
+        ActiveRuntimeId id = runtime.getRuntimeId();
+        runtimes.putIfAbsent(id, runtime);
     }
 
     public void deregisterRuntime(ActiveRuntimeId id) {
@@ -77,6 +89,42 @@
         }
     }
 
+    public void shutdown() {
+        LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+        shutdown = true;
+        Map<ActiveRuntimeId, Future<Boolean>> stopFutures = new HashMap<>();
+        for (Iterator<Map.Entry<ActiveRuntimeId, IActiveRuntime>> i = 
runtimes.entrySet().iterator(); i
+                .hasNext();) {
+            Map.Entry<ActiveRuntimeId, IActiveRuntime> entry = i.next();
+            if (entry != null) {
+                final ActiveRuntimeId runtimeId;
+                final IActiveRuntime runtime;
+                try {
+                    runtimeId = entry.getKey();
+                    runtime = entry.getValue();
+                } catch (IllegalStateException e) { // NOSONAR - see 
ConcurrentMap.forEach() impl
+                    // the object is no longer in the map; continue
+                    continue;
+                }
+                stopFutures.put(runtimeId, executor.submit(() -> { 
runtime.stop(); return true; }));
+            }
+            i.remove();
+        }
+        stopFutures.entrySet().parallelStream().forEach(entry -> {
+            try {
+                entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Interrupted waiting to stop runtime: " + 
entry.getKey());
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e) {
+                LOGGER.warn("Exception while stopping runtime: " + 
entry.getKey(), e);
+            } catch (TimeoutException e) {
+                LOGGER.warn("Timed out waiting to stop runtime: " + 
entry.getKey(), e);
+            }
+        });
+        LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+    }
+
     private void stopRuntime(ActiveManagerMessage message) {
         ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
         IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -93,4 +141,5 @@
             });
         }
     }
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 55ea971..183a0ed 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -290,6 +290,11 @@
     }
 
     @Override
+    public void preStop() throws Exception {
+        activeManager.shutdown();
+    }
+
+    @Override
     public void deinitialize() throws HyracksDataException {
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index fcc6f1f..ce36f33 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -169,6 +169,10 @@
         webManager.stop();
     }
 
+    @Override
+    public void preStop() throws Exception {
+    }
+
     protected HttpServer setupWebServer(ExternalProperties externalProperties) 
throws Exception {
         HttpServer webServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
                 externalProperties.getWebInterfacePort());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index ce33cef..aac7bc3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -150,6 +150,11 @@
     }
 
     @Override
+    public void preStop() throws Exception {
+        runtimeContext.preStop();
+    }
+
+    @Override
     public void startupCompleted() throws Exception {
         // Since we don't pass initial run flag in 
AsterixHyracksIntegrationUtil, we use the virtualNC flag
         final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
index 548d714..da4da6b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
@@ -50,6 +50,8 @@
 
     ITransactionSubsystem getTransactionSubsystem();
 
+    void preStop() throws Exception;
+
     boolean isShuttingdown();
 
     ILSMIOOperationScheduler getLSMIOScheduler();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
index 4bc3d82..03cead0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
@@ -37,7 +37,7 @@
         executorService.execute(command);
     }
 
-    public Future<? extends Object> submit(Callable<? extends Object> command) 
{
+    public <T> Future<T> submit(Callable<T> command) {
         return executorService.submit(command);
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 6748287..ad9b4af 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -165,6 +165,7 @@
     public static final int UTIL_FILE_SYSTEM_WATCHER_NO_FILES_FOUND = 3076;
     public static final int UTIL_LOCAL_FILE_SYSTEM_UTILS_PATH_NOT_FOUND = 3077;
     public static final int UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER = 
3078;
+    public static final int ACTIVE_MANAGER_SHUTDOWN = 3079;
 
     private ErrorCode() {
     }
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index c4698df..7e439dd 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -151,3 +151,4 @@
 3076 = %1$s: no files found
 3077 = %1$s: path not found
 3078 = Cannot obtain hdfs scheduler
+3079 = Cannot register runtime, active manager has been shutdown
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..c18a385 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -28,6 +28,8 @@
 
     void stop() throws Exception; //NOSONAR
 
+    void preStop() throws Exception; //NOSONAR
+
     void registerConfig(IConfigManager configManager);
 
     Object getApplicationContext();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 1f2c7a5..3aa95b3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -70,7 +70,7 @@
      */
     @Deprecated
     public HyracksException(Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+        this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause, 
null);
     }
 
     /**
@@ -78,7 +78,7 @@
      */
     @Deprecated
     public HyracksException(Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, 
nodeId);
+        this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause, 
nodeId);
     }
 
     /**
@@ -102,7 +102,7 @@
     }
 
     public HyracksException(Throwable cause, int errorCode, Serializable... 
params) {
-        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, 
null, params);
+        this(ErrorMessageUtil.NONE, errorCode, String.valueOf(cause), cause, 
null, params);
     }
 
     public HyracksException(String component, int errorCode, String message, 
Serializable... params) {
@@ -110,7 +110,7 @@
     }
 
     public HyracksException(String component, int errorCode, Throwable cause, 
Serializable... params) {
-        this(component, errorCode, cause.getMessage(), cause, null, params);
+        this(component, errorCode, String.valueOf(cause), cause, null, params);
     }
 
     public HyracksException(String component, int errorCode, String message, 
Throwable cause, Serializable... params) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index 8e423b8..9046147 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -49,6 +49,11 @@
     }
 
     @Override
+    public void preStop() throws Exception {
+        // no-op
+    }
+
+    @Override
     public void startupCompleted() throws Exception {
         // no-op
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 869ab5b..4a5805e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -54,6 +54,11 @@
     }
 
     @Override
+    public void preStop() throws Exception {
+        // no-op
+    }
+
+    @Override
     public NodeCapacity getCapacity() {
         int allCores = 
ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
         return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 
? allCores - 1 : allCores);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b893d26..14d221e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -336,6 +336,7 @@
     public synchronized void stop() throws Exception {
         if (!shuttedDown) {
             LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+            application.preStop();
             executor.shutdownNow();
             if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                 LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing 
with abnormal shutdown");
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e34e551..fe3c02c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -372,10 +372,8 @@
             } finally {
                 collector.close();
             }
-        } catch (HyracksException e) {
-            throw new HyracksDataException(e);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1594
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I45e83b0378198f80297fd2741969507741914dea
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to