Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1594
Change subject: Shutdown ActiveManager Before Killing Thread Executors
......................................................................
Shutdown ActiveManager Before Killing Thread Executors
Avoid race conditions with start / stop of active runtimes by stopping
the ActiveManager & and started runtimes before terminating the thread
executor
Change-Id: I45e83b0378198f80297fd2741969507741914dea
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
14 files changed, 94 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/94/1594/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b15cfca..f15270d 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,11 +18,20 @@
*/
package org.apache.asterix.active;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.common.api.ThreadExecutor;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
@@ -30,12 +39,14 @@
public class ActiveManager {
private static final Logger LOGGER =
Logger.getLogger(ActiveManager.class.getName());
- private final Executor executor;
- private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
+ public static final int SHUTDOWN_TIMEOUT_SECS = 60;
+ private final ThreadExecutor executor;
+ private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
private final ConcurrentFramePool activeFramePool;
private final String nodeId;
+ private volatile boolean shutdown;
- public ActiveManager(Executor executor, String nodeId, long
activeMemoryBudget, int frameSize)
+ public ActiveManager(ThreadExecutor executor, String nodeId, long
activeMemoryBudget, int frameSize)
throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
@@ -47,11 +58,12 @@
return activeFramePool;
}
- public void registerRuntime(IActiveRuntime runtime) {
- ActiveRuntimeId id = runtime.getRuntimeId();
- if (!runtimes.containsKey(id)) {
- runtimes.put(id, runtime);
+ public void registerRuntime(IActiveRuntime runtime) throws
HyracksDataException {
+ if (shutdown) {
+ throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN);
}
+ ActiveRuntimeId id = runtime.getRuntimeId();
+ runtimes.putIfAbsent(id, runtime);
}
public void deregisterRuntime(ActiveRuntimeId id) {
@@ -77,6 +89,42 @@
}
}
+ public void shutdown() {
+ LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+ shutdown = true;
+ Map<ActiveRuntimeId, Future<Boolean>> stopFutures = new HashMap<>();
+ for (Iterator<Map.Entry<ActiveRuntimeId, IActiveRuntime>> i =
runtimes.entrySet().iterator(); i
+ .hasNext();) {
+ Map.Entry<ActiveRuntimeId, IActiveRuntime> entry = i.next();
+ if (entry != null) {
+ final ActiveRuntimeId runtimeId;
+ final IActiveRuntime runtime;
+ try {
+ runtimeId = entry.getKey();
+ runtime = entry.getValue();
+ } catch (IllegalStateException e) { // NOSONAR - see
ConcurrentMap.forEach() impl
+ // the object is no longer in the map; continue
+ continue;
+ }
+ stopFutures.put(runtimeId, executor.submit(() -> {
runtime.stop(); return true; }));
+ }
+ i.remove();
+ }
+ stopFutures.entrySet().parallelStream().forEach(entry -> {
+ try {
+ entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted waiting to stop runtime: " +
entry.getKey());
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOGGER.warn("Exception while stopping runtime: " +
entry.getKey(), e);
+ } catch (TimeoutException e) {
+ LOGGER.warn("Timed out waiting to stop runtime: " +
entry.getKey(), e);
+ }
+ });
+ LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+ }
+
private void stopRuntime(ActiveManagerMessage message) {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -93,4 +141,5 @@
});
}
}
+
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 55ea971..183a0ed 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -290,6 +290,11 @@
}
@Override
+ public void preStop() throws Exception {
+ activeManager.shutdown();
+ }
+
+ @Override
public void deinitialize() throws HyracksDataException {
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index fcc6f1f..ce36f33 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -169,6 +169,10 @@
webManager.stop();
}
+ @Override
+ public void preStop() throws Exception {
+ }
+
protected HttpServer setupWebServer(ExternalProperties externalProperties)
throws Exception {
HttpServer webServer = new HttpServer(webManager.getBosses(),
webManager.getWorkers(),
externalProperties.getWebInterfacePort());
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index ce33cef..aac7bc3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -150,6 +150,11 @@
}
@Override
+ public void preStop() throws Exception {
+ runtimeContext.preStop();
+ }
+
+ @Override
public void startupCompleted() throws Exception {
// Since we don't pass initial run flag in
AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties =
runtimeContext.getNodeProperties();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
index 548d714..da4da6b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
@@ -50,6 +50,8 @@
ITransactionSubsystem getTransactionSubsystem();
+ void preStop() throws Exception;
+
boolean isShuttingdown();
ILSMIOOperationScheduler getLSMIOScheduler();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
index 4bc3d82..03cead0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
@@ -37,7 +37,7 @@
executorService.execute(command);
}
- public Future<? extends Object> submit(Callable<? extends Object> command)
{
+ public <T> Future<T> submit(Callable<T> command) {
return executorService.submit(command);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 6748287..ad9b4af 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -165,6 +165,7 @@
public static final int UTIL_FILE_SYSTEM_WATCHER_NO_FILES_FOUND = 3076;
public static final int UTIL_LOCAL_FILE_SYSTEM_UTILS_PATH_NOT_FOUND = 3077;
public static final int UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER =
3078;
+ public static final int ACTIVE_MANAGER_SHUTDOWN = 3079;
private ErrorCode() {
}
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index c4698df..7e439dd 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -151,3 +151,4 @@
3076 = %1$s: no files found
3077 = %1$s: path not found
3078 = Cannot obtain hdfs scheduler
+3079 = Cannot register runtime, active manager has been shutdown
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..c18a385 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -28,6 +28,8 @@
void stop() throws Exception; //NOSONAR
+ void preStop() throws Exception; //NOSONAR
+
void registerConfig(IConfigManager configManager);
Object getApplicationContext();
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 1f2c7a5..3aa95b3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -70,7 +70,7 @@
*/
@Deprecated
public HyracksException(Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause,
null);
}
/**
@@ -78,7 +78,7 @@
*/
@Deprecated
public HyracksException(Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause,
nodeId);
+ this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause,
nodeId);
}
/**
@@ -102,7 +102,7 @@
}
public HyracksException(Throwable cause, int errorCode, Serializable...
params) {
- this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause,
null, params);
+ this(ErrorMessageUtil.NONE, errorCode, String.valueOf(cause), cause,
null, params);
}
public HyracksException(String component, int errorCode, String message,
Serializable... params) {
@@ -110,7 +110,7 @@
}
public HyracksException(String component, int errorCode, Throwable cause,
Serializable... params) {
- this(component, errorCode, cause.getMessage(), cause, null, params);
+ this(component, errorCode, String.valueOf(cause), cause, null, params);
}
public HyracksException(String component, int errorCode, String message,
Throwable cause, Serializable... params) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index 8e423b8..9046147 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -49,6 +49,11 @@
}
@Override
+ public void preStop() throws Exception {
+ // no-op
+ }
+
+ @Override
public void startupCompleted() throws Exception {
// no-op
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 869ab5b..4a5805e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -54,6 +54,11 @@
}
@Override
+ public void preStop() throws Exception {
+ // no-op
+ }
+
+ @Override
public NodeCapacity getCapacity() {
int allCores =
ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1
? allCores - 1 : allCores);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b893d26..14d221e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -336,6 +336,7 @@
public synchronized void stop() throws Exception {
if (!shuttedDown) {
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ application.preStop();
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing
with abnormal shutdown");
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e34e551..fe3c02c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -372,10 +372,8 @@
} finally {
collector.close();
}
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1594
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I45e83b0378198f80297fd2741969507741914dea
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>