Michael Blow has submitted this change and it was merged. Change subject: Audit work queue blocks/waits, AbstractWork Not Runnable ......................................................................
Audit work queue blocks/waits, AbstractWork Not Runnable 1. Log a warning for work which blocks and/or waits 2. Make AbstractWork no longer extend Runnable, add javadoc explaining importance of work completely quickly 3. Minor refactoring due to 2. Change-Id: I211e4a9e68ee3ac5fa8e02d79b661068734035c7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1220 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java R hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java R hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java D hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java 9 files changed, 135 insertions(+), 98 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index fda9cc3..f8c8512 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -36,6 +36,8 @@ import org.apache.hyracks.ipc.exceptions.IPCException; public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface { + private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30; + private final IIPCHandle ipcHandle; private final RPCInterface rpci; @@ -127,15 +129,16 @@ HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf = new HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService); rpci.call(ipcHandle, csdf); + int i = 0; // give the CC some time to do final settling after it returns our request - int seconds = 30; - while (ipcHandle.isConnected() && --seconds > 0) { + while (ipcHandle.isConnected() && i++ < SHUTDOWN_CONNECTION_TIMEOUT_SECS) { synchronized (this) { wait(TimeUnit.SECONDS.toMillis(1)); } } if (ipcHandle.isConnected()) { - throw new IPCException("CC refused to release connection after 30 seconds"); + throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + + " seconds"); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java index 7931cf8..889c828 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java @@ -27,11 +27,12 @@ import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.common.utils.ThreadDumpHelper; +import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; -import org.apache.hyracks.control.common.work.ThreadDumpWork; -public class GetThreadDumpWork extends ThreadDumpWork { - private static final Logger LOGGER = Logger.getLogger(ThreadDumpWork.class.getName()); +public class GetThreadDumpWork extends AbstractWork { + private static final Logger LOGGER = Logger.getLogger(GetThreadDumpWork.class.getName()); public static final int TIMEOUT_SECS = 60; private final ClusterControllerService ccs; @@ -48,10 +49,15 @@ } @Override - protected void doRun() throws Exception { + public void run() { if (nodeId == null) { // null nodeId means the request is for the cluster controller - callback.setValue(takeDump(ManagementFactory.getThreadMXBean())); + try { + callback.setValue(ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean())); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception taking CC thread dump", e); + callback.setException(e); + } } else { final NodeControllerState ncState = ccs.getNodeMap().get(nodeId); if (ncState == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java similarity index 91% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java index bf1965d..eacb9e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.work; +package org.apache.hyracks.control.common.utils; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; @@ -29,10 +29,13 @@ import org.json.JSONException; import org.json.JSONObject; -public abstract class ThreadDumpWork extends SynchronizableWork { +public class ThreadDumpHelper { - protected String takeDump(ThreadMXBean threadMXBean) throws JSONException { - ThreadInfo [] threadInfos = threadMXBean.dumpAllThreads(true, true); + private ThreadDumpHelper() { + } + + public static String takeDumpJSON(ThreadMXBean threadMXBean) throws JSONException { + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); List<Map<String, Object>> threads = new ArrayList<>(); for (ThreadInfo thread : threadInfos) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java index 9376370..076dd66 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java @@ -31,6 +31,12 @@ return className.substring(className.lastIndexOf('.') + 1, endIndex); } + /** + * run is executed on a single thread that services the work queue. As a result run should never wait or block as + * this will delay processing for the whole queue. + */ + public abstract void run(); + @Override public String toString() { return getName(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java index fe0821f..f1b00ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java @@ -18,8 +18,10 @@ */ package org.apache.hyracks.control.common.work; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,11 +35,11 @@ private final LinkedBlockingQueue<AbstractWork> queue; private final WorkerThread thread; - private final Semaphore stopSemaphore; private boolean stopped; private AtomicInteger enqueueCount; private AtomicInteger dequeueCount; private int threadPriority = Thread.MAX_PRIORITY; + private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); public WorkQueue(String id, int threadPriority) { if (threadPriority != Thread.MAX_PRIORITY && threadPriority != Thread.NORM_PRIORITY @@ -47,21 +49,14 @@ this.threadPriority = threadPriority; queue = new LinkedBlockingQueue<>(); thread = new WorkerThread(id); - stopSemaphore = new Semaphore(1); stopped = true; - if(DEBUG) { + if (DEBUG) { enqueueCount = new AtomicInteger(0); dequeueCount = new AtomicInteger(0); } } public void start() throws HyracksException { - try { - stopSemaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new HyracksException(e); - } if (DEBUG) { enqueueCount.set(0); dequeueCount.set(0); @@ -76,7 +71,7 @@ } thread.interrupt(); try { - stopSemaphore.acquire(); + thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new HyracksException(e); @@ -107,35 +102,46 @@ @Override public void run() { - try { - AbstractWork r; - while (true) { - synchronized (WorkQueue.this) { - if (stopped) { - return; - } - } - try { - r = queue.take(); - } catch (InterruptedException e) { // NOSONAR: aborting the thread - break; - } - if (DEBUG) { - LOGGER.log(Level.FINEST, - "Dequeue (" + WorkQueue.this.hashCode() + "): " + dequeueCount.incrementAndGet() + "/" - + enqueueCount); - } - try { - if (LOGGER.isLoggable(r.logLevel())) { - LOGGER.log(r.logLevel(), "Executing: " + r); - } - r.run(); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception while executing " + r, e); + AbstractWork r; + while (true) { + synchronized (WorkQueue.this) { + if (stopped) { + return; } } - } finally { - stopSemaphore.release(); + try { + r = queue.take(); + } catch (InterruptedException e) { // NOSONAR: aborting the thread + break; + } + if (DEBUG) { + LOGGER.log(Level.FINEST, + "Dequeue (" + WorkQueue.this.hashCode() + "): " + dequeueCount.incrementAndGet() + "/" + + enqueueCount); + } + if (LOGGER.isLoggable(r.logLevel())) { + LOGGER.log(r.logLevel(), "Executing: " + r); + } + ThreadInfo before = threadMXBean.getThreadInfo(thread.getId()); + try { + r.run(); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception while executing " + r, e); + } finally { + auditWaitsAndBlocks(r, before); + } + } + } + + protected void auditWaitsAndBlocks(AbstractWork r, ThreadInfo before) { + ThreadInfo after = threadMXBean.getThreadInfo(thread.getId()); + final long waitedDelta = after.getWaitedCount() - before.getWaitedCount(); + final long blockedDelta = after.getBlockedCount() - before.getBlockedCount(); + if (waitedDelta > 0 || blockedDelta > 0) { + LOGGER.warning("Work " + r + " waited " + waitedDelta + " times (~" + + (after.getWaitedTime() - before.getWaitedTime()) + "ms), blocked " + blockedDelta + + " times (~" + (after.getBlockedTime() - before.getBlockedTime()) + "ms)" + ); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index d2d4811..ed46b53 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -85,10 +85,10 @@ import org.apache.hyracks.control.nc.work.CleanupJobletWork; import org.apache.hyracks.control.nc.work.DeployBinaryWork; import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; -import org.apache.hyracks.control.nc.work.ShutdownWork; +import org.apache.hyracks.control.nc.task.ShutdownTask; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; -import org.apache.hyracks.control.nc.work.NodeThreadDumpWork; +import org.apache.hyracks.control.nc.task.ThreadDumpTask; import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; @@ -573,12 +573,12 @@ case SHUTDOWN_REQUEST: final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn; - executor.submit(new ShutdownWork(NodeControllerService.this, sdrf.isTerminateNCService())); + executor.submit(new ShutdownTask(NodeControllerService.this, sdrf.isTerminateNCService())); return; case THREAD_DUMP_REQUEST: final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn; - executor.submit(new NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId())); + executor.submit(new ThreadDumpTask(NodeControllerService.this, tdrf.getRequestId())); return; default: diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java similarity index 89% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java index c195c98..cdbd4ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java @@ -17,21 +17,20 @@ * under the License. */ -package org.apache.hyracks.control.nc.work; +package org.apache.hyracks.control.nc.task; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hyracks.control.common.base.IClusterController; -import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; -public class ShutdownWork extends AbstractWork { - private static final Logger LOGGER = Logger.getLogger(ShutdownWork.class.getName()); +public class ShutdownTask implements Runnable { + private static final Logger LOGGER = Logger.getLogger(ShutdownTask.class.getName()); private final NodeControllerService ncs; private final boolean terminateNCService; - public ShutdownWork(NodeControllerService ncs, boolean terminateNCService) { + public ShutdownTask(NodeControllerService ncs, boolean terminateNCService) { this.ncs = ncs; this.terminateNCService = terminateNCService; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java new file mode 100644 index 0000000..68d9223 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.task; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.control.common.utils.ThreadDumpHelper; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ThreadDumpTask implements Runnable { + private static final Logger LOGGER = Logger.getLogger(ThreadDumpTask.class.getName()); + private final NodeControllerService ncs; + private final String requestId; + + public ThreadDumpTask(NodeControllerService ncs, String requestId) { + this.ncs = ncs; + this.requestId = requestId; + } + + @Override + public void run() { + String result; + try { + result = ThreadDumpHelper.takeDumpJSON(ncs.getThreadMXBean()); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception taking thread dump", e); + result = null; + } + try { + ncs.getClusterController().notifyThreadDump( + ncs.getApplicationContext().getNodeId(), requestId, result); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception sending thread dump to CC", e); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java deleted file mode 100644 index 85233b2..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.work; - -import org.apache.hyracks.control.common.work.ThreadDumpWork; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class NodeThreadDumpWork extends ThreadDumpWork { - private final NodeControllerService ncs; - private final String requestId; - - public NodeThreadDumpWork(NodeControllerService ncs, String requestId) { - this.ncs = ncs; - this.requestId = requestId; - } - - @Override - protected void doRun() throws Exception { - final String result = takeDump(ncs.getThreadMXBean()); - ncs.getClusterController().notifyThreadDump( - ncs.getApplicationContext().getNodeId(), requestId, result); - } -} -- To view, visit https://asterix-gerrit.ics.uci.edu/1220 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I211e4a9e68ee3ac5fa8e02d79b661068734035c7 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
