Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1217

Change subject: NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts
......................................................................

NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts

1. Fix NC WorkQueue shutdown to interrupt() possibly stuck work
2. Adjust NC shutdown timeout on CC to allow NC to timeout any work it
   is awaiting
3. Improved logging (i.e. /printStackTrace()/LOGGER.log.../)

Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
5 files changed, 56 insertions(+), 52 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/17/1217/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f88f30f..fda9cc3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -22,6 +22,7 @@
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -46,21 +47,22 @@
 
     @Override
     public ClusterControllerInfo getClusterControllerInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif 
= new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif 
=
+                new 
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
         return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
     }
 
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new 
HyracksClientInterfaceFunctions.GetJobStatusFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                new 
HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
         return (JobStatus) rpci.call(ipcHandle, gjsf);
     }
 
     @Override
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception {
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new 
HyracksClientInterfaceFunctions.StartJobFunction(
-                acggfBytes, jobFlags);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -73,47 +75,50 @@
 
     @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction 
gddsf = new 
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction 
gddsf =
+                new 
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
         return (NetworkAddress) rpci.call(ipcHandle, gddsf);
     }
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new 
HyracksClientInterfaceFunctions.WaitForCompletionFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                new 
HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
         rpci.call(ipcHandle, wfcf);
     }
 
     @Override
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws 
Exception {
-        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = 
new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
+                new 
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
         return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
     }
 
     @Override
     public ClusterTopology getClusterTopology() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new 
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
+                new 
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
         return (ClusterTopology) rpci.call(ipcHandle, gctf);
     }
 
     @Override
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) 
throws Exception {
-        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new 
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
-                binaryURLs, deploymentId);
+        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                new 
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, 
deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public void unDeployBinary(DeploymentId deploymentId) throws Exception {
-        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new 
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
-                deploymentId);
+        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
+                new 
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public JobInfo getJobInfo(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new 
HyracksClientInterfaceFunctions.GetJobInfoFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
         return (JobInfo) rpci.call(ipcHandle, gjsf);
     }
 
@@ -122,14 +127,15 @@
         HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
                 new 
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
         rpci.call(ipcHandle, csdf);
-        //give the CC some time to do final settling after it returns our 
request
-        for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+        // give the CC some time to do final settling after it returns our 
request
+        int seconds = 30;
+        while (ipcHandle.isConnected() && --seconds > 0) {
             synchronized (this) {
-                wait(3000l); //3sec
+                wait(TimeUnit.SECONDS.toMillis(1));
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after 9 
seconds");
+            throw new IPCException("CC refused to release connection after 30 
seconds");
         }
     }
 
@@ -145,6 +151,5 @@
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
         return (String)rpci.call(ipcHandle, tdf);
-
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index 29e1f83..e05dfbc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -50,7 +50,7 @@
     public void doRun() {
         try {
             if (ccs.getShutdownRun() != null) {
-                throw new IPCException("Shutdown in Progress");
+                throw new IPCException("Shutdown already in progress");
             }
             Map<String, NodeControllerState> nodeControllerStateMap = 
ccs.getNodeMap();
             Set<String> nodeIds = new TreeSet<>();
@@ -73,12 +73,13 @@
                         /*
                          * wait for all our acks
                          */
+                        LOGGER.info("Waiting for NCs to shutdown...");
                         boolean cleanShutdown = 
shutdownStatus.waitForCompletion();
                         if (!cleanShutdown) {
                             /*
                              * best effort - just exit, user will have to kill 
misbehaving NCs
                              */
-                            LOGGER.severe("Clean shutdown of NCs timed out- 
giving up!  Unresponsive nodes: " +
+                            LOGGER.severe("Clean shutdown of NCs timed out- 
giving up; unresponsive nodes: " +
                                     shutdownStatus.getRemainingNodes());
                         }
                         callback.setValue(cleanShutdown);
@@ -96,12 +97,13 @@
         }
     }
 
-    protected void shutdownNode(String key, NodeControllerState ncState) {
+    protected void shutdownNode(String nodeId, NodeControllerState ncState) {
         try {
+            LOGGER.info("Notifying NC " + nodeId + " to shutdown...");
             ncState.getNodeController().shutdown(terminateNCService);
         } catch (Exception e) {
-            LOGGER.log(
-                    Level.INFO, "Exception shutting down NC " + key + " 
(possibly dead?), continuing shutdown...", e);
+            LOGGER.log(Level.INFO,
+                    "Exception shutting down NC " + nodeId + " (possibly 
dead?), continuing shutdown...", e);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index 4e5c98f..0a50f6f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -21,12 +21,13 @@
 
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 public class ShutdownRun implements IShutdownStatusConditionVariable{
 
     private final Set<String> shutdownNodeIds = new TreeSet<>();
     private boolean shutdownSuccess = false;
-    private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+    private static final long SHUTDOWN_TIMER_MS = 
TimeUnit.SECONDS.toMillis(30);
 
     public ShutdownRun(Set<String> nodeIds) {
         shutdownNodeIds.addAll(nodeIds);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index f9df54b..fe0821f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -45,7 +45,7 @@
             throw new IllegalArgumentException("Illegal thread priority 
number.");
         }
         this.threadPriority = threadPriority;
-        queue = new LinkedBlockingQueue<AbstractWork>();
+        queue = new LinkedBlockingQueue<>();
         thread = new WorkerThread(id);
         stopSemaphore = new Semaphore(1);
         stopped = true;
@@ -59,6 +59,7 @@
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
         if (DEBUG) {
@@ -73,14 +74,11 @@
         synchronized (this) {
             stopped = true;
         }
-        schedule(new AbstractWork() {
-            @Override
-            public void run() {
-            }
-        });
+        thread.interrupt();
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
     }
@@ -119,8 +117,8 @@
                     }
                     try {
                         r = queue.take();
-                    } catch (InterruptedException e) {
-                        continue;
+                    } catch (InterruptedException e) { // NOSONAR: aborting 
the thread
+                        break;
                     }
                     if (DEBUG) {
                         LOGGER.log(Level.FINEST,
@@ -133,7 +131,7 @@
                         }
                         r.run();
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        LOGGER.log(Level.WARNING, "Exception while executing " 
+ r, e);
                     }
                 }
             } finally {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..6571c0d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -98,7 +98,7 @@
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService implements IControllerService {
-    private static Logger LOGGER = 
Logger.getLogger(NodeControllerService.class.getName());
+    private static final Logger LOGGER = 
Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
 
@@ -182,7 +182,7 @@
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
-        jobletMap = new Hashtable<JobId, Joblet>();
+        jobletMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -192,7 +192,7 @@
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         registrationPending = true;
-        getNodeControllerInfosAcceptor = new 
MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+        getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) 
(memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
         ioCounter = new IOCounterFactory().getIOCounter();
     }
@@ -210,7 +210,7 @@
     }
 
     private static List<IODeviceHandle> getDevices(String ioDevices) {
-        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+        List<IODeviceHandle> devices = new ArrayList<>();
         StringTokenizer tok = new StringTokenizer(ioDevices, ",");
         while (tok.hasMoreElements()) {
             String devPath = tok.nextToken().trim();
@@ -227,7 +227,7 @@
     }
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws 
Exception {
-        FutureValue<Map<String, NodeControllerInfo>> fv = new 
FutureValue<Map<String, NodeControllerInfo>>();
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
         synchronized (getNodeControllerInfosAcceptor) {
             while (getNodeControllerInfosAcceptor.getValue() != null) {
                 getNodeControllerInfosAcceptor.wait();
@@ -350,7 +350,7 @@
             LOGGER.log(Level.INFO, "Stopping NodeControllerService");
             executor.shutdownNow();
             if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
-                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing 
shutdown abnormally");
+                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing 
with abnormal shutdown");
             }
             partitionManager.close();
             datasetPartitionManager.close();
@@ -480,7 +480,7 @@
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
             }
         }
     }
@@ -495,7 +495,7 @@
         @Override
         public void run() {
             try {
-                FutureValue<List<JobProfile>> fv = new 
FutureValue<List<JobProfile>>();
+                FutureValue<List<JobProfile>> fv = new FutureValue<>();
                 BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, fv);
                 queue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
@@ -503,7 +503,7 @@
                     cc.reportProfile(id, profiles);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.WARNING, "Exception reporting profile", e);
             }
         }
     }
@@ -580,8 +580,10 @@
                     final CCNCFunctions.ThreadDumpRequestFunction tdrf = 
(CCNCFunctions.ThreadDumpRequestFunction) fn;
                     queue.schedule(new 
NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
                     return;
+
+                default:
+                    throw new IllegalArgumentException("Unknown function: " + 
fn.getFunctionId());
             }
-            throw new IllegalArgumentException("Unknown function: " + 
fn.getFunctionId());
 
         }
     }
@@ -611,15 +613,11 @@
 
         @Override
         public void run() {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shutdown hook in progress");
-            }
+            LOGGER.info("Shutdown hook in progress");
             try {
                 nodeControllerService.stop();
             } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in executing shutdown hook" + e);
-                }
+                LOGGER.warning("Exception in executing shutdown hook" + e);
             }
         }
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1217
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to