Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1217
Change subject: NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts
......................................................................
NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts
1. Fix NC WorkQueue shutdown to interrupt() possibly stuck work
2. Adjust NC shutdown timeout on CC to allow NC to timeout any work it
is awaiting
3. Improved logging (i.e. /printStackTrace()/LOGGER.log.../)
Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
5 files changed, 56 insertions(+), 52 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/17/1217/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f88f30f..fda9cc3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -22,6 +22,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -46,21 +47,22 @@
@Override
public ClusterControllerInfo getClusterControllerInfo() throws Exception {
- HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif
= new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+ HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif
=
+ new
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
}
@Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new
HyracksClientInterfaceFunctions.GetJobStatusFunction(
- jobId);
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+ new
HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
return (JobStatus) rpci.call(ipcHandle, gjsf);
}
@Override
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws
Exception {
- HyracksClientInterfaceFunctions.StartJobFunction sjf = new
HyracksClientInterfaceFunctions.StartJobFunction(
- acggfBytes, jobFlags);
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ new
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -73,47 +75,50 @@
@Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction
gddsf = new
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+ HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction
gddsf =
+ new
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
return (NetworkAddress) rpci.call(ipcHandle, gddsf);
}
@Override
public void waitForCompletion(JobId jobId) throws Exception {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new
HyracksClientInterfaceFunctions.WaitForCompletionFunction(
- jobId);
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+ new
HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
rpci.call(ipcHandle, wfcf);
}
@Override
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws
Exception {
- HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+ HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
+ new
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
}
@Override
public ClusterTopology getClusterTopology() throws Exception {
- HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+ HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
+ new
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
return (ClusterTopology) rpci.call(ipcHandle, gctf);
}
@Override
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId)
throws Exception {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
- binaryURLs, deploymentId);
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+ new
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs,
deploymentId);
rpci.call(ipcHandle, dbf);
}
@Override
public void unDeployBinary(DeploymentId deploymentId) throws Exception {
- HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
- deploymentId);
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
+ new
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
rpci.call(ipcHandle, dbf);
}
@Override
public JobInfo getJobInfo(JobId jobId) throws Exception {
- HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new
HyracksClientInterfaceFunctions.GetJobInfoFunction(
- jobId);
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+ new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
return (JobInfo) rpci.call(ipcHandle, gjsf);
}
@@ -122,14 +127,15 @@
HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
new
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
rpci.call(ipcHandle, csdf);
- //give the CC some time to do final settling after it returns our
request
- for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+ // give the CC some time to do final settling after it returns our
request
+ int seconds = 30;
+ while (ipcHandle.isConnected() && --seconds > 0) {
synchronized (this) {
- wait(3000l); //3sec
+ wait(TimeUnit.SECONDS.toMillis(1));
}
}
if (ipcHandle.isConnected()) {
- throw new IPCException("CC refused to release connection after 9
seconds");
+ throw new IPCException("CC refused to release connection after 30
seconds");
}
}
@@ -145,6 +151,5 @@
HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
return (String)rpci.call(ipcHandle, tdf);
-
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index 29e1f83..e05dfbc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -50,7 +50,7 @@
public void doRun() {
try {
if (ccs.getShutdownRun() != null) {
- throw new IPCException("Shutdown in Progress");
+ throw new IPCException("Shutdown already in progress");
}
Map<String, NodeControllerState> nodeControllerStateMap =
ccs.getNodeMap();
Set<String> nodeIds = new TreeSet<>();
@@ -73,12 +73,13 @@
/*
* wait for all our acks
*/
+ LOGGER.info("Waiting for NCs to shutdown...");
boolean cleanShutdown =
shutdownStatus.waitForCompletion();
if (!cleanShutdown) {
/*
* best effort - just exit, user will have to kill
misbehaving NCs
*/
- LOGGER.severe("Clean shutdown of NCs timed out-
giving up! Unresponsive nodes: " +
+ LOGGER.severe("Clean shutdown of NCs timed out-
giving up; unresponsive nodes: " +
shutdownStatus.getRemainingNodes());
}
callback.setValue(cleanShutdown);
@@ -96,12 +97,13 @@
}
}
- protected void shutdownNode(String key, NodeControllerState ncState) {
+ protected void shutdownNode(String nodeId, NodeControllerState ncState) {
try {
+ LOGGER.info("Notifying NC " + nodeId + " to shutdown...");
ncState.getNodeController().shutdown(terminateNCService);
} catch (Exception e) {
- LOGGER.log(
- Level.INFO, "Exception shutting down NC " + key + "
(possibly dead?), continuing shutdown...", e);
+ LOGGER.log(Level.INFO,
+ "Exception shutting down NC " + nodeId + " (possibly
dead?), continuing shutdown...", e);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index 4e5c98f..0a50f6f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -21,12 +21,13 @@
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
public class ShutdownRun implements IShutdownStatusConditionVariable{
private final Set<String> shutdownNodeIds = new TreeSet<>();
private boolean shutdownSuccess = false;
- private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+ private static final long SHUTDOWN_TIMER_MS =
TimeUnit.SECONDS.toMillis(30);
public ShutdownRun(Set<String> nodeIds) {
shutdownNodeIds.addAll(nodeIds);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index f9df54b..fe0821f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -45,7 +45,7 @@
throw new IllegalArgumentException("Illegal thread priority
number.");
}
this.threadPriority = threadPriority;
- queue = new LinkedBlockingQueue<AbstractWork>();
+ queue = new LinkedBlockingQueue<>();
thread = new WorkerThread(id);
stopSemaphore = new Semaphore(1);
stopped = true;
@@ -59,6 +59,7 @@
try {
stopSemaphore.acquire();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new HyracksException(e);
}
if (DEBUG) {
@@ -73,14 +74,11 @@
synchronized (this) {
stopped = true;
}
- schedule(new AbstractWork() {
- @Override
- public void run() {
- }
- });
+ thread.interrupt();
try {
stopSemaphore.acquire();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new HyracksException(e);
}
}
@@ -119,8 +117,8 @@
}
try {
r = queue.take();
- } catch (InterruptedException e) {
- continue;
+ } catch (InterruptedException e) { // NOSONAR: aborting
the thread
+ break;
}
if (DEBUG) {
LOGGER.log(Level.FINEST,
@@ -133,7 +131,7 @@
}
r.run();
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Exception while executing "
+ r, e);
}
}
} finally {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..6571c0d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -98,7 +98,7 @@
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
public class NodeControllerService implements IControllerService {
- private static Logger LOGGER =
Logger.getLogger(NodeControllerService.class.getName());
+ private static final Logger LOGGER =
Logger.getLogger(NodeControllerService.class.getName());
private static final double MEMORY_FUDGE_FACTOR = 0.8;
@@ -182,7 +182,7 @@
lccm = new LifeCycleComponentManager();
queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves
MAX_PRIORITY of the heartbeat thread.
- jobletMap = new Hashtable<JobId, Joblet>();
+ jobletMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
new File(new File(NodeControllerService.class.getName()), id));
@@ -192,7 +192,7 @@
runtimeMXBean = ManagementFactory.getRuntimeMXBean();
osMXBean = ManagementFactory.getOperatingSystemMXBean();
registrationPending = true;
- getNodeControllerInfosAcceptor = new
MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+ getNodeControllerInfosAcceptor = new MutableObject<>();
memoryManager = new MemoryManager((long)
(memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
ioCounter = new IOCounterFactory().getIOCounter();
}
@@ -210,7 +210,7 @@
}
private static List<IODeviceHandle> getDevices(String ioDevices) {
- List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+ List<IODeviceHandle> devices = new ArrayList<>();
StringTokenizer tok = new StringTokenizer(ioDevices, ",");
while (tok.hasMoreElements()) {
String devPath = tok.nextToken().trim();
@@ -227,7 +227,7 @@
}
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws
Exception {
- FutureValue<Map<String, NodeControllerInfo>> fv = new
FutureValue<Map<String, NodeControllerInfo>>();
+ FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
synchronized (getNodeControllerInfosAcceptor) {
while (getNodeControllerInfosAcceptor.getValue() != null) {
getNodeControllerInfosAcceptor.wait();
@@ -350,7 +350,7 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
- LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing
shutdown abnormally");
+ LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing
with abnormal shutdown");
}
partitionManager.close();
datasetPartitionManager.close();
@@ -480,7 +480,7 @@
try {
cc.nodeHeartbeat(id, hbData);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
}
}
}
@@ -495,7 +495,7 @@
@Override
public void run() {
try {
- FutureValue<List<JobProfile>> fv = new
FutureValue<List<JobProfile>>();
+ FutureValue<List<JobProfile>> fv = new FutureValue<>();
BuildJobProfilesWork bjpw = new
BuildJobProfilesWork(NodeControllerService.this, fv);
queue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
@@ -503,7 +503,7 @@
cc.reportProfile(id, profiles);
}
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Exception reporting profile", e);
}
}
}
@@ -580,8 +580,10 @@
final CCNCFunctions.ThreadDumpRequestFunction tdrf =
(CCNCFunctions.ThreadDumpRequestFunction) fn;
queue.schedule(new
NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
return;
+
+ default:
+ throw new IllegalArgumentException("Unknown function: " +
fn.getFunctionId());
}
- throw new IllegalArgumentException("Unknown function: " +
fn.getFunctionId());
}
}
@@ -611,15 +613,11 @@
@Override
public void run() {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shutdown hook in progress");
- }
+ LOGGER.info("Shutdown hook in progress");
try {
nodeControllerService.stop();
} catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in executing shutdown hook" + e);
- }
+ LOGGER.warning("Exception in executing shutdown hook" + e);
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1217
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>