Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1969

Change subject: [NO ISSUE] Refactoring / cleanup of HTTP cluster state / 
diagnostics APIs
......................................................................

[NO ISSUE] Refactoring / cleanup of HTTP cluster state / diagnostics APIs

Change-Id: If47ec45bf88a39d63421903080fee3ddc0f1e42b
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
7 files changed, 69 insertions(+), 51 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/69/1969/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index 9d2415d..8bffd34 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -34,25 +34,32 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
     private static final Logger LOGGER = 
Logger.getLogger(DiagnosticsApiServlet.class.getName());
-    private final ICcApplicationContext appCtx;
+    protected final ObjectMapper om;
+    protected final IHyracksClientConnection hcc;
+    protected final ExecutorService executor;
 
-    public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
+    public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths) {
         super(ctx, paths);
-        this.appCtx = appCtx;
+        this.om = new ObjectMapper();
+        this.hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+        this.executor = (ExecutorService) 
ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
     }
 
     @Override
@@ -60,15 +67,13 @@
         HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
         PrintWriter responseWriter = response.writer();
         ObjectNode json;
-        ObjectMapper om = new ObjectMapper();
         response.setStatus(HttpResponseStatus.OK);
         om.enable(SerializationFeature.INDENT_OUTPUT);
         try {
             if (!"".equals(localPath(request))) {
                 throw new IllegalArgumentException();
             }
-            json = getClusterDiagnosticsJSON();
-            
responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
+            
responseWriter.write(JSONUtil.convertNode(getClusterDiagnosticsJSON()));
         } catch (IllegalStateException e) { // NOSONAR - exception not logged 
or rethrown
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
         } catch (IllegalArgumentException e) { // NOSONAR - exception not 
logged or rethrown
@@ -81,33 +86,20 @@
         responseWriter.flush();
     }
 
-    private ObjectNode getClusterDiagnosticsJSON() throws Exception {
-        ObjectMapper om = new ObjectMapper();
-        IHyracksClientConnection hcc = (IHyracksClientConnection) 
ctx.get(HYRACKS_CONNECTION_ATTR);
-        ExecutorService executor = (ExecutorService) 
ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
-        Map<String, Future<ObjectNode>> ccFutureData = new HashMap<>();
-        ccFutureData.put("threaddump",
-                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getThreadDump(null)))));
-        ccFutureData.put("config",
-                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
-        ccFutureData.put("stats",
-                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
+    protected ObjectNode getClusterDiagnosticsJSON() throws Exception {
+        Map<String, Future<JsonNode>> ccFutureData;
+        ccFutureData = getCcDiagosticsFutures();
 
-        Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new 
HashMap<>();
-        for (String nc : appCtx.getMetadataProperties().getNodeNames()) {
-            Map<String, Future<ObjectNode>> ncData = new HashMap<>();
-            ncData.put("threaddump", executor.submit(() -> 
fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
-            ncData.put("config", executor
-                    .submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
-            ncData.put("stats", executor.submit(() -> 
fixupKeys(processNodeStats(hcc, nc))));
-            ncDataMap.put(nc, ncData);
+        Map<String, Map<String, Future<JsonNode>>> ncDataMap = new HashMap<>();
+        for (String nc : ClusterStateManager.INSTANCE.getParticipantNodes()) {
+            ncDataMap.put(nc, getNcDiagnosticFutures(nc));
         }
         ObjectNode result = om.createObjectNode();
         result.putPOJO("cc", resolveFutures(ccFutureData));
         List<Map<String, ?>> ncList = new ArrayList<>();
-        for (Map.Entry<String, Map<String, Future<ObjectNode>>> entry : 
ncDataMap.entrySet()) {
-            final Map<String, Object> ncMap = resolveFutures(entry.getValue());
-            ncMap.put("node_id", entry.getKey());
+        for (Map.Entry<String, Map<String, Future<JsonNode>>> entry : 
ncDataMap.entrySet()) {
+            final Map<String, JsonNode> ncMap = 
resolveFutures(entry.getValue());
+            ncMap.put("node_id", new TextNode(entry.getKey()));
             ncList.add(ncMap);
         }
         result.putPOJO("ncs", ncList);
@@ -115,10 +107,32 @@
         return result;
     }
 
-    private Map<String, Object> resolveFutures(Map<String, Future<ObjectNode>> 
futureMap)
+    protected Map<String, Future<JsonNode>> getNcDiagnosticFutures(String nc) {
+        Map<String, Future<JsonNode>> ncData;
+        ncData = new HashMap<>();
+        ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getThreadDump(nc)))));
+        ncData.put("config",
+                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
+        ncData.put("stats", executor.submit(() -> 
fixupKeys(processNodeStats(hcc, nc))));
+        return ncData;
+    }
+
+    protected Map<String, Future<JsonNode>> getCcDiagosticsFutures() {
+        Map<String, Future<JsonNode>> ccFutureData;
+        ccFutureData = new HashMap<>();
+        ccFutureData.put("threaddump",
+                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getThreadDump(null)))));
+        ccFutureData.put("config",
+                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
+        ccFutureData.put("stats",
+                executor.submit(() -> fixupKeys((ObjectNode) 
om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
+        return ccFutureData;
+    }
+
+    protected Map<String, JsonNode> resolveFutures(Map<String, 
Future<JsonNode>> futureMap)
             throws ExecutionException, InterruptedException {
-        Map<String, Object> result = new HashMap<>();
-        for (Map.Entry<String, Future<ObjectNode>> entry : 
futureMap.entrySet()) {
+        Map<String, JsonNode> result = new HashMap<>();
+        for (Map.Entry<String, Future<JsonNode>> entry : futureMap.entrySet()) 
{
             result.put(entry.getKey(), entry.getValue().get());
         }
         return result;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 6291869..01c59f3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -180,8 +180,9 @@
         for (int i = 0; i < gcNames.size(); i++) {
             ObjectNode gc = om.createObjectNode();
             gc.set("name", gcNames.get(i));
-            gc.set("collection-time", ((ArrayNode) 
gcCollectionTimes.get(i)).get(index));
-            gc.set("collection-count", ((ArrayNode) 
gcCollectionCounts.get(i)).get(index));
+            gc.set("collection-time", gcCollectionTimes.get(i).get(index));
+            gc.set("collection-count", gcCollectionCounts.get(i).get(index));
+            fixupKeys(gc);
             gcs.add(gc);
         }
         json.set("gcs", gcs);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9040ad1..b08c1e2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -313,7 +313,7 @@
             case Servlets.CLUSTER_STATE_CC_DETAIL:
                 return new ClusterControllerDetailsApiServlet(ctx, paths);
             case Servlets.DIAGNOSTICS:
-                return new DiagnosticsApiServlet(ctx, paths, appCtx);
+                return new DiagnosticsApiServlet(ctx, paths);
             case Servlets.ACTIVE_STATS:
                 return new ActiveStatsApiServlet(ctx, paths, appCtx);
             default:
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index be84bc3..c5b9d11 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
 
 package org.apache.asterix.test.runtime;
 
-import static 
org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSON;
+import static 
org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -190,7 +190,7 @@
     }
 
     private static void checkThreadLeaks() throws IOException {
-        String threadDump = 
ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean());
+        String threadDump = 
ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
         // Currently we only do sanity check for threads used in the execution 
engine.
         // Later we should check if there are leaked storage threads as well.
         if (threadDump.contains("Operator") || 
threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@
         try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
             int runFileCount = Integer.parseInt(reader.readLine().trim());
             if (runFileCount != 0) {
-                
System.out.print(takeDumpJSON(ManagementFactory.getThreadMXBean()));
+                
System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
                 outputLeakedOpenFiles(processId);
                 throw new AssertionError("There are " + runFileCount + " 
leaked run files.");
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index d7c7be1..b5388c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -55,7 +55,7 @@
         if (nodeId == null) {
             // null nodeId means the request is for the cluster controller
             try {
-                
callback.setValue(ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean()));
+                
callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
             } catch (Exception e) {
                 LOGGER.log(Level.WARNING, "Exception taking CC thread dump", 
e);
                 callback.setException(e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
index 1d6dbcd..62c6586 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hyracks.control.common.utils;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import java.io.IOException;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
@@ -33,12 +27,23 @@
 import java.util.List;
 import java.util.Map;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class ThreadDumpHelper {
+    private static final ObjectMapper om = new ObjectMapper();
 
     private ThreadDumpHelper() {
+        om.enable(SerializationFeature.INDENT_OUTPUT);
     }
 
-    public static String takeDumpJSON(ThreadMXBean threadMXBean) throws 
IOException {
+    public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws 
IOException {
+        ObjectNode json = takeDumpJSON(threadMXBean);
+        return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+    }
+
+    public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
         ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
         List<Map<String, Object>> threads = new ArrayList<>();
 
@@ -73,9 +78,8 @@
             }
             threads.add(threadMap);
         }
-        ObjectMapper om = new ObjectMapper();
         ObjectNode json = om.createObjectNode();
-        json.put("date", new Date().toString());
+        json.put("date", String.valueOf(new Date()));
         json.putPOJO("threads", threads);
 
         long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
@@ -86,7 +90,6 @@
         if (monitorDeadlockedThreads != null && 
monitorDeadlockedThreads.length > 0) {
             json.putPOJO("monitor_deadlocked_thread_ids", 
monitorDeadlockedThreads);
         }
-        om.enable(SerializationFeature.INDENT_OUTPUT);
-        return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+        return json;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index 5ebb99a..abde87f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -38,7 +38,7 @@
     public void run() {
         String result;
         try {
-            result = ThreadDumpHelper.takeDumpJSON(ncs.getThreadMXBean());
+            result = 
ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
             result = null;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1969
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If47ec45bf88a39d63421903080fee3ddc0f1e42b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to