Repository: cassandra Updated Branches: refs/heads/trunk 71445706c -> d30bfcaf3
Add nodetool clientlist patch by Chris Lohfink; reviewed by jasobrown for CASSANDRA-13665 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d30bfcaf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d30bfcaf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d30bfcaf Branch: refs/heads/trunk Commit: d30bfcaf3430b8eb11cca913105b01364502348e Parents: 7144570 Author: Chris Lohfink <clohf...@apple.com> Authored: Wed Jan 31 23:50:53 2018 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri Feb 2 14:24:08 2018 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/source/operating/metrics.rst | 12 ++-- .../apache/cassandra/metrics/ClientMetrics.java | 4 +- .../service/NativeTransportService.java | 34 +++++++++- .../apache/cassandra/service/StorageProxy.java | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 26 ++++++++ .../org/apache/cassandra/tools/NodeTool.java | 1 + .../cassandra/tools/nodetool/ClientStats.java | 69 ++++++++++++++++++++ .../org/apache/cassandra/transport/Message.java | 1 + .../org/apache/cassandra/transport/Server.java | 45 +++++++++++++ .../cassandra/transport/ServerConnection.java | 8 +++ 11 files changed, 194 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ead79e..a10b6eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add nodetool clientlist (CASSANDRA-13665) * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211) * Non-disruptive seed node list reload (CASSANDRA-14190) * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/doc/source/operating/metrics.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst index 2df1cf8..345fc3e 100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@ -631,11 +631,13 @@ Reported name format: **JMX MBean** ``org.apache.cassandra.metrics:type=Client name=<MetricName>`` -=========================== ============== =========== -Name Type Description -=========================== ============== =========== -connectedNativeClients Counter Number of clients connected to this nodes native protocol server -=========================== ============== =========== +============================== =============================== =========== +Name Type Description +============================== =============================== =========== +connectedNativeClients Gauge<Integer> Number of clients connected to this nodes native protocol server +connections Gauge<List<Map<String, String>> List of all connections and their state information +connectedNativeClientsByUser Gauge<Map<String, Int> Number of connnective native clients by username +============================== =============================== =========== Batch Metrics http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/metrics/ClientMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java index db6422c..8ca3480 100644 --- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java @@ -36,9 +36,9 @@ public class ClientMetrics { } - public void addCounter(String name, final Callable<Integer> provider) + public <T> void addGauge(String name, final Callable<T> provider) { - Metrics.register(factory.createMetricName(name), (Gauge<Integer>) () -> { + Metrics.register(factory.createMetricName(name), (Gauge<T>) () -> { try { return provider.call(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/service/NativeTransportService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java index b184442..42764e1 100644 --- a/src/java/org/apache/cassandra/service/NativeTransportService.java +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@ -18,12 +18,19 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,14 +116,39 @@ public class NativeTransportService } // register metrics - ClientMetrics.instance.addCounter("connectedNativeClients", () -> + ClientMetrics.instance.addGauge("connectedNativeClients", () -> { int ret = 0; for (Server server : servers) ret += server.getConnectedClients(); return ret; }); + ClientMetrics.instance.addGauge("connectedNativeClientsByUser", () -> + { + Map<String, Integer> result = new HashMap<>(); + for (Server server : servers) + { + for (Entry<String, Integer> e : server.getConnectedClientsByUser().entrySet()) + { + String user = e.getKey(); + result.put(user, result.getOrDefault(user, 0) + e.getValue()); + } + } + return result; + }); + ClientMetrics.instance.addGauge("connections", () -> + { + List<Map<String, String>> result = new ArrayList<>(); + for (Server server : servers) + { + for (Map<String, String> e : server.getConnectionStates()) + { + result.add(e); + } + } + return result; + }); AuthMetrics.init(); initialized = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index dcf0cab..e2125d4 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -74,6 +74,7 @@ import org.apache.cassandra.service.paxos.ProposeCallback; import org.apache.cassandra.service.paxos.ProposeVerbHandler; import org.apache.cassandra.net.MessagingService.Verb; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index ec8f7ba..69b64ab 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1493,6 +1493,32 @@ public class NodeProbe implements AutoCloseable /** * Retrieve Proxy metrics + * @param connections, connectedNativeClients, connectedNativeClientsByUser + */ + public Object getClientMetric(String metricName) + { + try + { + switch(metricName) + { + case "connections": // List<Map<String,String>> - list of all native connections and their properties + case "connectedNativeClients": // number of connected native clients + case "connectedNativeClientsByUser": // number of native clients by username + return JMX.newMBeanProxy(mbeanServerConn, + new ObjectName("org.apache.cassandra.metrics:type=Client,name=" + metricName), + CassandraMetricsRegistry.JmxGaugeMBean.class).getValue(); + default: + throw new RuntimeException("Unknown client metric " + metricName); + } + } + catch (MalformedObjectNameException e) + { + throw new RuntimeException(e); + } + } + + /** + * Retrieve Proxy metrics * @param metricName Exceptions, Load, TotalHints or TotalHintsInProgress. */ public long getStorageMetric(String metricName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index f7b7f76..81f2023 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -59,6 +59,7 @@ public class NodeTool TableHistograms.class, Cleanup.class, ClearSnapshot.class, + ClientStats.class, Compact.class, Scrub.class, Verify.class, http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java new file mode 100644 index 0000000..21915cb --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; + +@Command(name = "clientstats", description = "Print information about connected clients") +public class ClientStats extends NodeToolCmd +{ + @Option(title = "list_connections", name = "--all", description = "Lists all connections") + private boolean listConnections = false; + + @Override + public void execute(NodeProbe probe) + { + if (listConnections) + { + List<Map<String, String>> clients = (List<Map<String, String>>) probe.getClientMetric("connections"); + if (!clients.isEmpty()) + { + TableBuilder table = new TableBuilder(); + table.add("Address", "SSL", "Version", "User", "Keyspace", "Requests"); + for (Map<String, String> conn : clients) + { + table.add(conn.get("address"), conn.get("ssl"), conn.get("version"), + conn.get("user"), conn.get("keyspace"), conn.get("requests")); + } + table.printTo(System.out); + System.out.println(); + } + } + + Map<String, Integer> connectionsByUser = (Map<String, Integer>) probe.getClientMetric("connectedNativeClientsByUser"); + int total = connectionsByUser.values().stream().reduce(0, Integer::sum); + System.out.println("Total connected clients: " + total); + System.out.println(); + TableBuilder table = new TableBuilder(); + table.add("User", "Connections"); + for (Entry<String, Integer> entry : connectionsByUser.entrySet()) + { + table.add(entry.getKey(), entry.getValue().toString()); + } + table.printTo(System.out); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 2da2ca7..f71d640 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -514,6 +514,7 @@ public abstract class Message QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); logger.trace("Received: {}, v={}", request, connection.getVersion()); + connection.requests.inc(); response = request.execute(qstate, queryStartNanoTime); response.setStreamId(request.getStreamId()); response.setWarnings(ClientWarn.instance.getWarnings()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index cd04edc..2c5e28a 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -30,6 +30,8 @@ import javax.net.ssl.SSLEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; + import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -173,6 +175,33 @@ public class Server implements CassandraDaemon.Server return connectionTracker.getConnectedClients(); } + public Map<String, Integer> getConnectedClientsByUser() + { + return connectionTracker.getConnectedClientsByUser(); + } + + public List<Map<String, String>> getConnectionStates() + { + List<Map<String, String>> result = new ArrayList<>(); + for(Channel c : connectionTracker.allChannels) + { + Connection connection = c.attr(Connection.attributeKey).get(); + if (connection instanceof ServerConnection) + { + ServerConnection conn = (ServerConnection) connection; + result.add(new ImmutableMap.Builder<String, String>() + .put("user", conn.getClientState().getUser().getName()) + .put("keyspace", conn.getClientState().getRawKeyspace() == null ? "" : conn.getClientState().getRawKeyspace()) + .put("address", conn.getClientState().getRemoteAddress().toString()) + .put("version", String.valueOf(conn.getVersion().asInt())) + .put("requests", String.valueOf(conn.requests.getCount())) + .put("ssl", conn.channel().pipeline().get(SslHandler.class) == null ? "false" : "true") + .build()); + } + } + return result; + } + private void close() { // Close opened connections @@ -285,6 +314,22 @@ public class Server implements CassandraDaemon.Server */ return allChannels.size() != 0 ? allChannels.size() - 1 : 0; } + + public Map<String, Integer> getConnectedClientsByUser() + { + Map<String, Integer> result = new HashMap<>(); + for(Channel c : allChannels) + { + Connection connection = c.attr(Connection.attributeKey).get(); + if (connection instanceof ServerConnection) + { + ServerConnection conn = (ServerConnection) connection; + String name = conn.getClientState().getUser().getName(); + result.put(name, result.getOrDefault(name, 0) + 1); + } + } + return result; + } } private static class Initializer extends ChannelInitializer<Channel> http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index 9374ca0..1ebf81c 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -26,6 +26,8 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import com.codahale.metrics.Counter; + public class ServerConnection extends Connection { private enum State { UNINITIALIZED, AUTHENTICATION, READY } @@ -33,6 +35,7 @@ public class ServerConnection extends Connection private volatile IAuthenticator.SaslNegotiator saslNegotiator; private final ClientState clientState; private volatile State state; + public final Counter requests = new Counter(); private final ConcurrentMap<Integer, QueryState> queryStates = new ConcurrentHashMap<>(); @@ -56,6 +59,11 @@ public class ServerConnection extends Connection return qState; } + public ClientState getClientState() + { + return clientState; + } + public QueryState validateNewMessage(Message.Type type, ProtocolVersion version, int streamId) { switch (state) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org