Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1220
Change subject: Audit work queue blocks/waits, AbstractWork Not Runnable
......................................................................
Audit work queue blocks/waits, AbstractWork Not Runnable
1. Log a warning for work which blocks and/or waits
2. Make AbstractWork no longer extend Runnable, add javadoc explaining
importance of work completely quickly
3. Minor refactoring due to 2.
Change-Id: I211e4a9e68ee3ac5fa8e02d79b661068734035c7
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
R
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
R
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
D
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
9 files changed, 132 insertions(+), 99 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/20/1220/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index fda9cc3..f8c8512 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -36,6 +36,8 @@
import org.apache.hyracks.ipc.exceptions.IPCException;
public class HyracksClientInterfaceRemoteProxy implements
IHyracksClientInterface {
+ private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30;
+
private final IIPCHandle ipcHandle;
private final RPCInterface rpci;
@@ -127,15 +129,16 @@
HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
new
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
rpci.call(ipcHandle, csdf);
+ int i = 0;
// give the CC some time to do final settling after it returns our
request
- int seconds = 30;
- while (ipcHandle.isConnected() && --seconds > 0) {
+ while (ipcHandle.isConnected() && i++ <
SHUTDOWN_CONNECTION_TIMEOUT_SECS) {
synchronized (this) {
wait(TimeUnit.SECONDS.toMillis(1));
}
}
if (ipcHandle.isConnected()) {
- throw new IPCException("CC refused to release connection after 30
seconds");
+ throw new IPCException("CC refused to release connection after " +
SHUTDOWN_CONNECTION_TIMEOUT_SECS
+ + " seconds");
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index 7931cf8..889c828 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -27,11 +27,12 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.hyracks.control.common.work.ThreadDumpWork;
-public class GetThreadDumpWork extends ThreadDumpWork {
- private static final Logger LOGGER =
Logger.getLogger(ThreadDumpWork.class.getName());
+public class GetThreadDumpWork extends AbstractWork {
+ private static final Logger LOGGER =
Logger.getLogger(GetThreadDumpWork.class.getName());
public static final int TIMEOUT_SECS = 60;
private final ClusterControllerService ccs;
@@ -48,10 +49,15 @@
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
if (nodeId == null) {
// null nodeId means the request is for the cluster controller
- callback.setValue(takeDump(ManagementFactory.getThreadMXBean()));
+ try {
+
callback.setValue(ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean()));
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception taking CC thread dump",
e);
+ callback.setException(e);
+ }
} else {
final NodeControllerState ncState = ccs.getNodeMap().get(nodeId);
if (ncState == null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
similarity index 91%
rename from
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java
rename to
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
index bf1965d..0016ee2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/ThreadDumpWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.work;
+package org.apache.hyracks.control.common.utils;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
@@ -29,10 +29,9 @@
import org.json.JSONException;
import org.json.JSONObject;
-public abstract class ThreadDumpWork extends SynchronizableWork {
-
- protected String takeDump(ThreadMXBean threadMXBean) throws JSONException {
- ThreadInfo [] threadInfos = threadMXBean.dumpAllThreads(true, true);
+public class ThreadDumpHelper {
+ public static String takeDumpJSON(ThreadMXBean threadMXBean) throws
JSONException {
+ ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
List<Map<String, Object>> threads = new ArrayList<>();
for (ThreadInfo thread : threadInfos) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java
index 9376370..076dd66 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/AbstractWork.java
@@ -31,6 +31,12 @@
return className.substring(className.lastIndexOf('.') + 1, endIndex);
}
+ /**
+ * run is executed on a single thread that services the work queue. As a
result run should never wait or block as
+ * this will delay processing for the whole queue.
+ */
+ public abstract void run();
+
@Override
public String toString() {
return getName();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index fe0821f..5efe3dd 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -18,8 +18,10 @@
*/
package org.apache.hyracks.control.common.work;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -33,11 +35,11 @@
private final LinkedBlockingQueue<AbstractWork> queue;
private final WorkerThread thread;
- private final Semaphore stopSemaphore;
private boolean stopped;
private AtomicInteger enqueueCount;
private AtomicInteger dequeueCount;
private int threadPriority = Thread.MAX_PRIORITY;
+ private final ThreadMXBean threadMXBean =
ManagementFactory.getThreadMXBean();
public WorkQueue(String id, int threadPriority) {
if (threadPriority != Thread.MAX_PRIORITY && threadPriority !=
Thread.NORM_PRIORITY
@@ -47,21 +49,14 @@
this.threadPriority = threadPriority;
queue = new LinkedBlockingQueue<>();
thread = new WorkerThread(id);
- stopSemaphore = new Semaphore(1);
stopped = true;
- if(DEBUG) {
+ if (DEBUG) {
enqueueCount = new AtomicInteger(0);
dequeueCount = new AtomicInteger(0);
}
}
public void start() throws HyracksException {
- try {
- stopSemaphore.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new HyracksException(e);
- }
if (DEBUG) {
enqueueCount.set(0);
dequeueCount.set(0);
@@ -76,7 +71,7 @@
}
thread.interrupt();
try {
- stopSemaphore.acquire();
+ thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HyracksException(e);
@@ -107,35 +102,46 @@
@Override
public void run() {
- try {
- AbstractWork r;
- while (true) {
- synchronized (WorkQueue.this) {
- if (stopped) {
- return;
- }
- }
- try {
- r = queue.take();
- } catch (InterruptedException e) { // NOSONAR: aborting
the thread
- break;
- }
- if (DEBUG) {
- LOGGER.log(Level.FINEST,
- "Dequeue (" + WorkQueue.this.hashCode() + "):
" + dequeueCount.incrementAndGet() + "/"
- + enqueueCount);
- }
- try {
- if (LOGGER.isLoggable(r.logLevel())) {
- LOGGER.log(r.logLevel(), "Executing: " + r);
- }
- r.run();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception while executing "
+ r, e);
+ AbstractWork r;
+ while (true) {
+ synchronized (WorkQueue.this) {
+ if (stopped) {
+ return;
}
}
- } finally {
- stopSemaphore.release();
+ try {
+ r = queue.take();
+ } catch (InterruptedException e) { // NOSONAR: aborting the
thread
+ break;
+ }
+ if (DEBUG) {
+ LOGGER.log(Level.FINEST,
+ "Dequeue (" + WorkQueue.this.hashCode() + "): " +
dequeueCount.incrementAndGet() + "/"
+ + enqueueCount);
+ }
+ if (LOGGER.isLoggable(r.logLevel())) {
+ LOGGER.log(r.logLevel(), "Executing: " + r);
+ }
+ ThreadInfo before = threadMXBean.getThreadInfo(thread.getId());
+ try {
+ r.run();
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception while executing " +
r, e);
+ } finally {
+ auditWaitsAndBlocks(r, before);
+ }
+ }
+ }
+
+ protected void auditWaitsAndBlocks(AbstractWork r, ThreadInfo before) {
+ ThreadInfo after = threadMXBean.getThreadInfo(thread.getId());
+ final long waitedDelta = after.getWaitedCount() -
before.getWaitedCount();
+ final long blockedDelta = after.getBlockedCount() -
before.getBlockedCount();
+ if (waitedDelta > 0 || blockedDelta > 0) {
+ LOGGER.warning("Work " + r + " waited " + waitedDelta + "
times (~"
+ + (after.getWaitedTime() - before.getWaitedTime()) +
"ms), blocked " + blockedDelta
+ + " times (~"+ (after.getBlockedTime() -
before.getBlockedTime()) + "ms)"
+ );
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d2d4811..ed46b53 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -85,10 +85,10 @@
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
import org.apache.hyracks.control.nc.work.DeployBinaryWork;
import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
-import org.apache.hyracks.control.nc.work.ShutdownWork;
+import org.apache.hyracks.control.nc.task.ShutdownTask;
import org.apache.hyracks.control.nc.work.StartTasksWork;
import org.apache.hyracks.control.nc.work.StateDumpWork;
-import org.apache.hyracks.control.nc.work.NodeThreadDumpWork;
+import org.apache.hyracks.control.nc.task.ThreadDumpTask;
import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
@@ -573,12 +573,12 @@
case SHUTDOWN_REQUEST:
final CCNCFunctions.ShutdownRequestFunction sdrf =
(CCNCFunctions.ShutdownRequestFunction) fn;
- executor.submit(new
ShutdownWork(NodeControllerService.this, sdrf.isTerminateNCService()));
+ executor.submit(new
ShutdownTask(NodeControllerService.this, sdrf.isTerminateNCService()));
return;
case THREAD_DUMP_REQUEST:
final CCNCFunctions.ThreadDumpRequestFunction tdrf =
(CCNCFunctions.ThreadDumpRequestFunction) fn;
- executor.submit(new
NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
+ executor.submit(new
ThreadDumpTask(NodeControllerService.this, tdrf.getRequestId()));
return;
default:
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
similarity index 89%
rename from
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
rename to
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
index c195c98..cdbd4ad 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
@@ -17,21 +17,20 @@
* under the License.
*/
-package org.apache.hyracks.control.nc.work;
+package org.apache.hyracks.control.nc.task;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.control.common.base.IClusterController;
-import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ShutdownWork extends AbstractWork {
- private static final Logger LOGGER =
Logger.getLogger(ShutdownWork.class.getName());
+public class ShutdownTask implements Runnable {
+ private static final Logger LOGGER =
Logger.getLogger(ShutdownTask.class.getName());
private final NodeControllerService ncs;
private final boolean terminateNCService;
- public ShutdownWork(NodeControllerService ncs, boolean terminateNCService)
{
+ public ShutdownTask(NodeControllerService ncs, boolean terminateNCService)
{
this.ncs = ncs;
this.terminateNCService = terminateNCService;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
new file mode 100644
index 0000000..68d9223
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ThreadDumpTask implements Runnable {
+ private static final Logger LOGGER =
Logger.getLogger(ThreadDumpTask.class.getName());
+ private final NodeControllerService ncs;
+ private final String requestId;
+
+ public ThreadDumpTask(NodeControllerService ncs, String requestId) {
+ this.ncs = ncs;
+ this.requestId = requestId;
+ }
+
+ @Override
+ public void run() {
+ String result;
+ try {
+ result = ThreadDumpHelper.takeDumpJSON(ncs.getThreadMXBean());
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
+ result = null;
+ }
+ try {
+ ncs.getClusterController().notifyThreadDump(
+ ncs.getApplicationContext().getNodeId(), requestId,
result);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception sending thread dump to CC",
e);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
deleted file mode 100644
index 85233b2..0000000
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.work;
-
-import org.apache.hyracks.control.common.work.ThreadDumpWork;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class NodeThreadDumpWork extends ThreadDumpWork {
- private final NodeControllerService ncs;
- private final String requestId;
-
- public NodeThreadDumpWork(NodeControllerService ncs, String requestId) {
- this.ncs = ncs;
- this.requestId = requestId;
- }
-
- @Override
- protected void doRun() throws Exception {
- final String result = takeDump(ncs.getThreadMXBean());
- ncs.getClusterController().notifyThreadDump(
- ncs.getApplicationContext().getNodeId(), requestId, result);
- }
-}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1220
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I211e4a9e68ee3ac5fa8e02d79b661068734035c7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>