Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2479][API] Add Network Diagnostics API ......................................................................
[ASTERIXDB-2479][API] Add Network Diagnostics API - user model changes: no - storage format changes: no - interface changes: no Details: - Add a new API that shows the state of node to node connections and their logical channels to help diagnose networking issues. - Add channel details to the waiting thread name in NetworkOutputChannel. - Add test case. Change-Id: Id6fd5a96c56e7078d1404bebcbab8afe93ba8f64 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3025 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Michael Blow <mb...@apache.org> --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java M asterixdb/asterix-app/src/test/resources/runtimets/api.xml A asterixdb/asterix-app/src/test/resources/runtimets/queries/api/net-diagnostics/net-diagnostics.1.get.http A asterixdb/asterix-app/src/test/resources/runtimets/results/api/net-diagnostics/net-diagnostics.1.regexadm M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java M hyracks-fullstack/hyracks/hyracks-net/pom.xml M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java 15 files changed, 215 insertions(+), 11 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved Murtadha Hubail: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java new file mode 100644 index 0000000..badb568 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.application.NCServiceContext; +import org.apache.hyracks.control.nc.net.NetworkManager; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.JsonNode; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public class NetDiagnosticsApiServlet extends AbstractServlet { + + private final INcApplicationContext appCtx; + + public NetDiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, INcApplicationContext appCtx, String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws IOException { + HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); + response.setStatus(HttpResponseStatus.OK); + final JsonNode netDiagnostics = getNetDiagnostics(); + final PrintWriter responseWriter = response.writer(); + JSONUtil.writeNode(responseWriter, netDiagnostics); + } + + private JsonNode getNetDiagnostics() { + final NCServiceContext serviceContext = (NCServiceContext) appCtx.getServiceContext(); + final NodeControllerService controllerService = (NodeControllerService) serviceContext.getControllerService(); + final NetworkManager networkManager = controllerService.getNetworkManager(); + return networkManager.getMuxDemux().getState(); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index fbafc2e..4fa86ae 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; +import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet; import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.StorageApiServlet; import org.apache.asterix.app.io.PersistedResourceRegistry; @@ -170,6 +171,8 @@ externalProperties.getNcApiPort(), config); apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx); apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE)); + apiServer.addServlet( + new NetDiagnosticsApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.NET_DIAGNOSTICS)); webManager.add(apiServer); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java index cf62e42..02f17e7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java @@ -19,10 +19,16 @@ package org.apache.asterix.test.runtime; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.control.nc.NodeControllerService; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -36,10 +42,21 @@ @RunWith(Parameterized.class) public class APIExecutionTest { protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; + private static final TestExecutor testExecutor = new TestExecutor(); @BeforeClass public static void setUp() throws Exception { - LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor); + final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs; + final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + final String ip = InetAddress.getLoopbackAddress().getHostAddress(); + for (NodeControllerService nc : ncs) { + final String nodeId = nc.getId(); + final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext(); + int apiPort = appCtx.getExternalProperties().getNcApiPort(); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + } + testExecutor.setNcEndPoints(ncEndPoints); } @AfterClass diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/api.xml b/asterixdb/asterix-app/src/test/resources/runtimets/api.xml index b686482..8b16889 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/api.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/api.xml @@ -118,5 +118,10 @@ <output-dir compare="Text">feed-stats</output-dir> </compilation-unit> </test-case> + <test-case FilePath="api"> + <compilation-unit name="net-diagnostics"> + <output-dir compare="Text">net-diagnostics</output-dir> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/api/net-diagnostics/net-diagnostics.1.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries/api/net-diagnostics/net-diagnostics.1.get.http new file mode 100644 index 0000000..12288a4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/api/net-diagnostics/net-diagnostics.1.get.http @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +nc:asterix_nc1 /admin/net \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/net-diagnostics/net-diagnostics.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/net-diagnostics/net-diagnostics.1.regexadm new file mode 100644 index 0000000..be8d03a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/net-diagnostics/net-diagnostics.1.regexadm @@ -0,0 +1,5 @@ +\{ + .*"incomingConnections" : \[.*\], + .*"localAddress" : .*,.* + .*"outgoingConnections" : \[.*\] +\} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index b135c7f..d5aa5d1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -35,6 +35,7 @@ public static final String DIAGNOSTICS = "/admin/diagnostics"; public static final String ACTIVE_STATS = "/admin/active/*"; public static final String STORAGE = "/admin/storage/*"; + public static final String NET_DIAGNOSTICS = "/admin/net/*"; private Servlets() { } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java index 261e7c4..17cdc3e 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java @@ -80,8 +80,8 @@ InetSocketAddress remoteAddress = ccb.getRemoteAddress(); String nameBefore = Thread.currentThread().getName(); try { - Thread.currentThread() - .setName(nameBefore + ":SendingTo(" + Objects.toString(remoteAddress) + ")"); + Thread.currentThread().setName( + nameBefore + ":SendingTo(" + Objects.toString(remoteAddress) + ") over " + ccb); wait(); } finally { Thread.currentThread().setName(nameBefore); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java index 8b02f9c..3298b78 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java @@ -104,6 +104,10 @@ return mConn.openChannel(); } + public MuxDemux getMuxDemux() { + return md; + } + private class ChannelOpenListener implements IChannelOpenListener { @Override public void channelOpened(ChannelControlBlock channel) { diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml index 192a80a..525543f 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml @@ -60,5 +60,9 @@ <artifactId>hyracks-util</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java index f7ef2aa..3a35212 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java @@ -29,6 +29,10 @@ import org.apache.hyracks.api.comm.IChannelWriteInterface; import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * Handle to a channel that represents a logical full-duplex communication end-point. @@ -168,4 +172,17 @@ public InetSocketAddress getRemoteAddress() { return cSet.getMultiplexedConnection().getRemoteAddress(); } + + public JsonNode getState() { + final ObjectNode state = JSONUtil.createObject(); + state.put("id", channelId); + state.put("localClose", localClose.get()); + state.put("localCloseAck", localCloseAck.get()); + state.put("remoteClose", remoteClose.get()); + state.put("remoteCloseAck", remoteCloseAck.get()); + state.put("readCredits", ri.getCredits()); + state.put("writeCredits", wi.getCredits()); + state.put("completelyClosed", completelyClosed()); + return state; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java index f5cdf2c..179f42c 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java @@ -24,8 +24,11 @@ import org.apache.hyracks.api.comm.IChannelInterfaceFactory; import org.apache.hyracks.api.comm.MuxDemuxCommand; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.util.JSONUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ArrayNode; public class ChannelSet { private static final Logger LOGGER = LogManager.getLogger(); @@ -243,4 +246,14 @@ public MultiplexedConnection getMultiplexedConnection() { return mConn; } + + public synchronized ArrayNode getState() { + final ArrayNode state = JSONUtil.createArray(); + for (ChannelControlBlock ccb : ccbArray) { + if (ccb != null) { + state.add(ccb.getState()); + } + } + return state; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java index 4c3836a..96ccafb 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java @@ -24,6 +24,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.BitSet; +import java.util.Optional; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.comm.IChannelInterfaceFactory; @@ -32,9 +33,14 @@ import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener; import org.apache.hyracks.net.protocols.tcp.TCPConnection; +import org.apache.hyracks.util.JSONUtil; import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * A {@link MultiplexedConnection} can be used by clients to create multiple "channels" @@ -442,4 +448,15 @@ public InetSocketAddress getRemoteAddress() { return tcpConnection == null ? null : tcpConnection.getRemoteAddress(); } + + public synchronized Optional<JsonNode> getState() { + if (tcpConnection == null) { + return Optional.empty(); + } + final ObjectNode state = JSONUtil.createObject(); + state.put("remoteAddress", getRemoteAddress().toString()); + final ArrayNode channels = cSet.getState(); + state.set("channels", channels); + return Optional.of(state); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java index c58cb86..4ee7e83 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java @@ -28,6 +28,11 @@ import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener; import org.apache.hyracks.net.protocols.tcp.TCPConnection; import org.apache.hyracks.net.protocols.tcp.TCPEndpoint; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * Multiplexed Connection Manager. @@ -43,7 +48,8 @@ private final int maxConnectionAttempts; - private final Map<InetSocketAddress, MultiplexedConnection> connectionMap; + private final Map<InetSocketAddress, MultiplexedConnection> outgoingConnectionMap; + private final Map<InetSocketAddress, MultiplexedConnection> incomingConnectionMap; private final TCPEndpoint tcpEndpoint; @@ -69,13 +75,14 @@ this.channelOpenListener = listener; this.maxConnectionAttempts = maxConnectionAttempts; this.channelInterfaceFatory = channelInterfaceFatory; - connectionMap = new HashMap<>(); + outgoingConnectionMap = new HashMap<>(); + incomingConnectionMap = new HashMap<>(); this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() { @Override public void connectionEstablished(TCPConnection connection) { MultiplexedConnection mConn; synchronized (MuxDemux.this) { - mConn = connectionMap.get(connection.getRemoteAddress()); + mConn = outgoingConnectionMap.get(connection.getRemoteAddress()); } assert mConn != null; mConn.setTCPConnection(connection); @@ -89,17 +96,18 @@ mConn.setTCPConnection(connection); connection.setEventListener(mConn); connection.setAttachment(mConn); + incomingConnectionMap.put(connection.getRemoteAddress(), mConn); } @Override public void connectionFailure(InetSocketAddress remoteAddress, IOException error) { MultiplexedConnection mConn; synchronized (MuxDemux.this) { - mConn = connectionMap.get(remoteAddress); + mConn = outgoingConnectionMap.get(remoteAddress); assert mConn != null; int nConnectionAttempts = mConn.getConnectionAttempts(); if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) { - connectionMap.remove(remoteAddress); + outgoingConnectionMap.remove(remoteAddress); mConn.setConnectionFailure(new IOException(remoteAddress.toString() + ": " + error, error)); } else { mConn.setConnectionAttempts(nConnectionAttempts + 1); @@ -112,7 +120,9 @@ public void connectionClosed(TCPConnection connection) { synchronized (MuxDemux.this) { if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) { - connectionMap.remove(connection.getRemoteAddress()); + outgoingConnectionMap.remove(connection.getRemoteAddress()); + } else if (connection.getType() == TCPConnection.ConnectionType.INCOMING) { + incomingConnectionMap.remove(connection.getRemoteAddress()); } } } @@ -144,10 +154,10 @@ public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException { MultiplexedConnection mConn; synchronized (this) { - mConn = connectionMap.get(remoteAddress); + mConn = outgoingConnectionMap.get(remoteAddress); if (mConn == null) { mConn = new MultiplexedConnection(this); - connectionMap.put(remoteAddress, mConn); + outgoingConnectionMap.put(remoteAddress, mConn); tcpEndpoint.initiateConnection(remoteAddress); } } @@ -186,4 +196,20 @@ public IChannelInterfaceFactory getChannelInterfaceFactory() { return channelInterfaceFatory; } + + public synchronized JsonNode getState() { + final ObjectNode state = JSONUtil.createObject(); + state.put("localAddress", tcpEndpoint.getLocalAddress().toString()); + final ArrayNode outgoingConnections = JSONUtil.createArray(); + state.set("outgoingConnections", outgoingConnections); + for (MultiplexedConnection connection : outgoingConnectionMap.values()) { + connection.getState().ifPresent(outgoingConnections::add); + } + final ArrayNode incomingConnections = JSONUtil.createArray(); + state.set("incomingConnections", incomingConnections); + for (MultiplexedConnection connection : incomingConnectionMap.values()) { + connection.getState().ifPresent(incomingConnections::add); + } + return state; + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java index baa3174..006659b 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java @@ -37,11 +37,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; public class JSONUtil { private static final Logger LOGGER = LogManager.getLogger(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String INDENT = "\t"; @@ -276,4 +278,12 @@ public static void put(ObjectNode o, String name, List<String> elements) { elements.forEach(o.putArray(name)::add); } + + public static ObjectNode createObject() { + return OBJECT_MAPPER.createObjectNode(); + } + + public static ArrayNode createArray() { + return OBJECT_MAPPER.createArrayNode(); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3025 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id6fd5a96c56e7078d1404bebcbab8afe93ba8f64 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>