Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1969
Change subject: [NO ISSUE] Refactoring / cleanup of HTTP cluster state /
diagnostics APIs
......................................................................
[NO ISSUE] Refactoring / cleanup of HTTP cluster state / diagnostics APIs
Change-Id: If47ec45bf88a39d63421903080fee3ddc0f1e42b
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
7 files changed, 69 insertions(+), 51 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/69/1969/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index 9d2415d..8bffd34 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -34,25 +34,32 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import io.netty.handler.codec.http.HttpResponseStatus;
public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
private static final Logger LOGGER =
Logger.getLogger(DiagnosticsApiServlet.class.getName());
- private final ICcApplicationContext appCtx;
+ protected final ObjectMapper om;
+ protected final IHyracksClientConnection hcc;
+ protected final ExecutorService executor;
- public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
+ public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths) {
super(ctx, paths);
- this.appCtx = appCtx;
+ this.om = new ObjectMapper();
+ this.hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ this.executor = (ExecutorService)
ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
}
@Override
@@ -60,15 +67,13 @@
HttpUtil.setContentType(response,
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
PrintWriter responseWriter = response.writer();
ObjectNode json;
- ObjectMapper om = new ObjectMapper();
response.setStatus(HttpResponseStatus.OK);
om.enable(SerializationFeature.INDENT_OUTPUT);
try {
if (!"".equals(localPath(request))) {
throw new IllegalArgumentException();
}
- json = getClusterDiagnosticsJSON();
-
responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
+
responseWriter.write(JSONUtil.convertNode(getClusterDiagnosticsJSON()));
} catch (IllegalStateException e) { // NOSONAR - exception not logged
or rethrown
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
} catch (IllegalArgumentException e) { // NOSONAR - exception not
logged or rethrown
@@ -81,33 +86,20 @@
responseWriter.flush();
}
- private ObjectNode getClusterDiagnosticsJSON() throws Exception {
- ObjectMapper om = new ObjectMapper();
- IHyracksClientConnection hcc = (IHyracksClientConnection)
ctx.get(HYRACKS_CONNECTION_ATTR);
- ExecutorService executor = (ExecutorService)
ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
- Map<String, Future<ObjectNode>> ccFutureData = new HashMap<>();
- ccFutureData.put("threaddump",
- executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getThreadDump(null)))));
- ccFutureData.put("config",
- executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
- ccFutureData.put("stats",
- executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
+ protected ObjectNode getClusterDiagnosticsJSON() throws Exception {
+ Map<String, Future<JsonNode>> ccFutureData;
+ ccFutureData = getCcDiagosticsFutures();
- Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new
HashMap<>();
- for (String nc : appCtx.getMetadataProperties().getNodeNames()) {
- Map<String, Future<ObjectNode>> ncData = new HashMap<>();
- ncData.put("threaddump", executor.submit(() ->
fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
- ncData.put("config", executor
- .submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
- ncData.put("stats", executor.submit(() ->
fixupKeys(processNodeStats(hcc, nc))));
- ncDataMap.put(nc, ncData);
+ Map<String, Map<String, Future<JsonNode>>> ncDataMap = new HashMap<>();
+ for (String nc : ClusterStateManager.INSTANCE.getParticipantNodes()) {
+ ncDataMap.put(nc, getNcDiagnosticFutures(nc));
}
ObjectNode result = om.createObjectNode();
result.putPOJO("cc", resolveFutures(ccFutureData));
List<Map<String, ?>> ncList = new ArrayList<>();
- for (Map.Entry<String, Map<String, Future<ObjectNode>>> entry :
ncDataMap.entrySet()) {
- final Map<String, Object> ncMap = resolveFutures(entry.getValue());
- ncMap.put("node_id", entry.getKey());
+ for (Map.Entry<String, Map<String, Future<JsonNode>>> entry :
ncDataMap.entrySet()) {
+ final Map<String, JsonNode> ncMap =
resolveFutures(entry.getValue());
+ ncMap.put("node_id", new TextNode(entry.getKey()));
ncList.add(ncMap);
}
result.putPOJO("ncs", ncList);
@@ -115,10 +107,32 @@
return result;
}
- private Map<String, Object> resolveFutures(Map<String, Future<ObjectNode>>
futureMap)
+ protected Map<String, Future<JsonNode>> getNcDiagnosticFutures(String nc) {
+ Map<String, Future<JsonNode>> ncData;
+ ncData = new HashMap<>();
+ ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getThreadDump(nc)))));
+ ncData.put("config",
+ executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
+ ncData.put("stats", executor.submit(() ->
fixupKeys(processNodeStats(hcc, nc))));
+ return ncData;
+ }
+
+ protected Map<String, Future<JsonNode>> getCcDiagosticsFutures() {
+ Map<String, Future<JsonNode>> ccFutureData;
+ ccFutureData = new HashMap<>();
+ ccFutureData.put("threaddump",
+ executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getThreadDump(null)))));
+ ccFutureData.put("config",
+ executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
+ ccFutureData.put("stats",
+ executor.submit(() -> fixupKeys((ObjectNode)
om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
+ return ccFutureData;
+ }
+
+ protected Map<String, JsonNode> resolveFutures(Map<String,
Future<JsonNode>> futureMap)
throws ExecutionException, InterruptedException {
- Map<String, Object> result = new HashMap<>();
- for (Map.Entry<String, Future<ObjectNode>> entry :
futureMap.entrySet()) {
+ Map<String, JsonNode> result = new HashMap<>();
+ for (Map.Entry<String, Future<JsonNode>> entry : futureMap.entrySet())
{
result.put(entry.getKey(), entry.getValue().get());
}
return result;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 6291869..01c59f3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -180,8 +180,9 @@
for (int i = 0; i < gcNames.size(); i++) {
ObjectNode gc = om.createObjectNode();
gc.set("name", gcNames.get(i));
- gc.set("collection-time", ((ArrayNode)
gcCollectionTimes.get(i)).get(index));
- gc.set("collection-count", ((ArrayNode)
gcCollectionCounts.get(i)).get(index));
+ gc.set("collection-time", gcCollectionTimes.get(i).get(index));
+ gc.set("collection-count", gcCollectionCounts.get(i).get(index));
+ fixupKeys(gc);
gcs.add(gc);
}
json.set("gcs", gcs);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9040ad1..b08c1e2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -313,7 +313,7 @@
case Servlets.CLUSTER_STATE_CC_DETAIL:
return new ClusterControllerDetailsApiServlet(ctx, paths);
case Servlets.DIAGNOSTICS:
- return new DiagnosticsApiServlet(ctx, paths, appCtx);
+ return new DiagnosticsApiServlet(ctx, paths);
case Servlets.ACTIVE_STATS:
return new ActiveStatsApiServlet(ctx, paths, appCtx);
default:
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index be84bc3..c5b9d11 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
package org.apache.asterix.test.runtime;
-import static
org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSON;
+import static
org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
import java.io.BufferedReader;
import java.io.File;
@@ -190,7 +190,7 @@
}
private static void checkThreadLeaks() throws IOException {
- String threadDump =
ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean());
+ String threadDump =
ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
// Currently we only do sanity check for threads used in the execution
engine.
// Later we should check if there are leaked storage threads as well.
if (threadDump.contains("Operator") ||
threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
int runFileCount = Integer.parseInt(reader.readLine().trim());
if (runFileCount != 0) {
-
System.out.print(takeDumpJSON(ManagementFactory.getThreadMXBean()));
+
System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
outputLeakedOpenFiles(processId);
throw new AssertionError("There are " + runFileCount + "
leaked run files.");
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index d7c7be1..b5388c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -55,7 +55,7 @@
if (nodeId == null) {
// null nodeId means the request is for the cluster controller
try {
-
callback.setValue(ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean()));
+
callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking CC thread dump",
e);
callback.setException(e);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
index 1d6dbcd..62c6586 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
@@ -18,12 +18,6 @@
*/
package org.apache.hyracks.control.common.utils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import java.io.IOException;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
@@ -33,12 +27,23 @@
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class ThreadDumpHelper {
+ private static final ObjectMapper om = new ObjectMapper();
private ThreadDumpHelper() {
+ om.enable(SerializationFeature.INDENT_OUTPUT);
}
- public static String takeDumpJSON(ThreadMXBean threadMXBean) throws
IOException {
+ public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws
IOException {
+ ObjectNode json = takeDumpJSON(threadMXBean);
+ return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ }
+
+ public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
List<Map<String, Object>> threads = new ArrayList<>();
@@ -73,9 +78,8 @@
}
threads.add(threadMap);
}
- ObjectMapper om = new ObjectMapper();
ObjectNode json = om.createObjectNode();
- json.put("date", new Date().toString());
+ json.put("date", String.valueOf(new Date()));
json.putPOJO("threads", threads);
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
@@ -86,7 +90,6 @@
if (monitorDeadlockedThreads != null &&
monitorDeadlockedThreads.length > 0) {
json.putPOJO("monitor_deadlocked_thread_ids",
monitorDeadlockedThreads);
}
- om.enable(SerializationFeature.INDENT_OUTPUT);
- return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ return json;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index 5ebb99a..abde87f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -38,7 +38,7 @@
public void run() {
String result;
try {
- result = ThreadDumpHelper.takeDumpJSON(ncs.getThreadMXBean());
+ result =
ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
result = null;
--
To view, visit https://asterix-gerrit.ics.uci.edu/1969
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If47ec45bf88a39d63421903080fee3ddc0f1e42b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>