Repository: cassandra
Updated Branches:
  refs/heads/trunk 71445706c -> d30bfcaf3


Add nodetool clientlist

patch by Chris Lohfink; reviewed by jasobrown for CASSANDRA-13665


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d30bfcaf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d30bfcaf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d30bfcaf

Branch: refs/heads/trunk
Commit: d30bfcaf3430b8eb11cca913105b01364502348e
Parents: 7144570
Author: Chris Lohfink <clohf...@apple.com>
Authored: Wed Jan 31 23:50:53 2018 -0800
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Fri Feb 2 14:24:08 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/source/operating/metrics.rst                | 12 ++--
 .../apache/cassandra/metrics/ClientMetrics.java |  4 +-
 .../service/NativeTransportService.java         | 34 +++++++++-
 .../apache/cassandra/service/StorageProxy.java  |  1 +
 .../org/apache/cassandra/tools/NodeProbe.java   | 26 ++++++++
 .../org/apache/cassandra/tools/NodeTool.java    |  1 +
 .../cassandra/tools/nodetool/ClientStats.java   | 69 ++++++++++++++++++++
 .../org/apache/cassandra/transport/Message.java |  1 +
 .../org/apache/cassandra/transport/Server.java  | 45 +++++++++++++
 .../cassandra/transport/ServerConnection.java   |  8 +++
 11 files changed, 194 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ead79e..a10b6eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add nodetool clientlist (CASSANDRA-13665)
  * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
  * Non-disruptive seed node list reload (CASSANDRA-14190)
  * Nodetool tablehistograms to print statics for all the tables 
(CASSANDRA-14185)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 2df1cf8..345fc3e 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -631,11 +631,13 @@ Reported name format:
 **JMX MBean**
     ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
 
-=========================== ============== ===========
-Name                        Type           Description
-=========================== ============== ===========
-connectedNativeClients      Counter        Number of clients connected to this 
nodes native protocol server
-=========================== ============== ===========
+============================== =============================== ===========
+Name                           Type                            Description
+============================== =============================== ===========
+connectedNativeClients         Gauge<Integer>                  Number of 
clients connected to this nodes native protocol server
+connections                    Gauge<List<Map<String, String>> List of all 
connections and their state information
+connectedNativeClientsByUser   Gauge<Map<String, Int>          Number of 
connnective native clients by username
+============================== =============================== ===========
 
 
 Batch Metrics

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/metrics/ClientMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index db6422c..8ca3480 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -36,9 +36,9 @@ public class ClientMetrics
     {
     }
 
-    public void addCounter(String name, final Callable<Integer> provider)
+    public <T> void addGauge(String name, final Callable<T> provider)
     {
-        Metrics.register(factory.createMetricName(name), (Gauge<Integer>) () 
-> {
+        Metrics.register(factory.createMetricName(name), (Gauge<T>) () -> {
             try
             {
                 return provider.call();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/service/NativeTransportService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java 
b/src/java/org/apache/cassandra/service/NativeTransportService.java
index b184442..42764e1 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -18,12 +18,19 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,14 +116,39 @@ public class NativeTransportService
         }
 
         // register metrics
-        ClientMetrics.instance.addCounter("connectedNativeClients", () ->
+        ClientMetrics.instance.addGauge("connectedNativeClients", () ->
         {
             int ret = 0;
             for (Server server : servers)
                 ret += server.getConnectedClients();
             return ret;
         });
+        ClientMetrics.instance.addGauge("connectedNativeClientsByUser", () ->
+        {
+            Map<String, Integer> result = new HashMap<>();
+            for (Server server : servers)
+            {
+                for (Entry<String, Integer> e : 
server.getConnectedClientsByUser().entrySet())
+                {
+                    String user = e.getKey();
+                    result.put(user, result.getOrDefault(user, 0) + 
e.getValue());
+                }
+            }
+            return result;
+        });
 
+        ClientMetrics.instance.addGauge("connections", () ->
+        {
+            List<Map<String, String>> result = new ArrayList<>();
+            for (Server server : servers)
+            {
+                for (Map<String, String> e : server.getConnectionStates())
+                {
+                    result.add(e);
+                }
+            }
+            return result;
+        });
         AuthMetrics.init();
 
         initialized = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index dcf0cab..e2125d4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index ec8f7ba..69b64ab 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1493,6 +1493,32 @@ public class NodeProbe implements AutoCloseable
 
     /**
      * Retrieve Proxy metrics
+     * @param connections, connectedNativeClients, connectedNativeClientsByUser
+     */
+    public Object getClientMetric(String metricName)
+    {
+        try
+        {
+            switch(metricName)
+            {
+                case "connections": // List<Map<String,String>> - list of all 
native connections and their properties
+                case "connectedNativeClients": // number of connected native 
clients
+                case "connectedNativeClientsByUser": // number of native 
clients by username
+                    return JMX.newMBeanProxy(mbeanServerConn,
+                            new 
ObjectName("org.apache.cassandra.metrics:type=Client,name=" + metricName),
+                            
CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();
+                default:
+                    throw new RuntimeException("Unknown client metric " + 
metricName);
+            }
+        }
+        catch (MalformedObjectNameException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Retrieve Proxy metrics
      * @param metricName Exceptions, Load, TotalHints or TotalHintsInProgress.
      */
     public long getStorageMetric(String metricName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index f7b7f76..81f2023 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -59,6 +59,7 @@ public class NodeTool
                 TableHistograms.class,
                 Cleanup.class,
                 ClearSnapshot.class,
+                ClientStats.class,
                 Compact.class,
                 Scrub.class,
                 Verify.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java 
b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
new file mode 100644
index 0000000..21915cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ClientStats.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+
+@Command(name = "clientstats", description = "Print information about 
connected clients")
+public class ClientStats extends NodeToolCmd
+{
+    @Option(title = "list_connections", name = "--all", description = "Lists 
all connections")
+    private boolean listConnections = false;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        if (listConnections)
+        {
+            List<Map<String, String>> clients = (List<Map<String, String>>) 
probe.getClientMetric("connections");
+            if (!clients.isEmpty())
+            {
+                TableBuilder table = new TableBuilder();
+                table.add("Address", "SSL", "Version", "User", "Keyspace", 
"Requests");
+                for (Map<String, String> conn : clients)
+                {
+                    table.add(conn.get("address"), conn.get("ssl"), 
conn.get("version"), 
+                              conn.get("user"), conn.get("keyspace"), 
conn.get("requests"));
+                }
+                table.printTo(System.out);
+                System.out.println();
+            }
+        }
+
+        Map<String, Integer> connectionsByUser = (Map<String, Integer>) 
probe.getClientMetric("connectedNativeClientsByUser");
+        int total = connectionsByUser.values().stream().reduce(0, 
Integer::sum);
+        System.out.println("Total connected clients: " + total);
+        System.out.println();
+        TableBuilder table = new TableBuilder();
+        table.add("User", "Connections");
+        for (Entry<String, Integer> entry : connectionsByUser.entrySet())
+        {
+            table.add(entry.getKey(), entry.getValue().toString());
+        }
+        table.printTo(System.out);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 2da2ca7..f71d640 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -514,6 +514,7 @@ public abstract class Message
                 QueryState qstate = 
connection.validateNewMessage(request.type, connection.getVersion(), 
request.getStreamId());
 
                 logger.trace("Received: {}, v={}", request, 
connection.getVersion());
+                connection.requests.inc();
                 response = request.execute(qstate, queryStartNanoTime);
                 response.setStreamId(request.getStreamId());
                 response.setWarnings(ClientWarn.instance.getWarnings());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index cd04edc..2c5e28a 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -30,6 +30,8 @@ import javax.net.ssl.SSLEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -173,6 +175,33 @@ public class Server implements CassandraDaemon.Server
         return connectionTracker.getConnectedClients();
     }
 
+    public Map<String, Integer> getConnectedClientsByUser()
+    {
+        return connectionTracker.getConnectedClientsByUser();
+    }
+
+    public List<Map<String, String>> getConnectionStates()
+    {
+        List<Map<String, String>> result = new ArrayList<>();
+        for(Channel c : connectionTracker.allChannels)
+        {
+            Connection connection = c.attr(Connection.attributeKey).get();
+            if (connection instanceof ServerConnection)
+            {
+                ServerConnection conn = (ServerConnection) connection;
+                result.add(new ImmutableMap.Builder<String, String>()
+                        .put("user", conn.getClientState().getUser().getName())
+                        .put("keyspace", 
conn.getClientState().getRawKeyspace() == null ? "" : 
conn.getClientState().getRawKeyspace())
+                        .put("address", 
conn.getClientState().getRemoteAddress().toString())
+                        .put("version", 
String.valueOf(conn.getVersion().asInt()))
+                        .put("requests", 
String.valueOf(conn.requests.getCount()))
+                        .put("ssl", 
conn.channel().pipeline().get(SslHandler.class) == null ? "false" : "true")
+                        .build());
+            }
+        }
+        return result;
+    }
+
     private void close()
     {
         // Close opened connections
@@ -285,6 +314,22 @@ public class Server implements CassandraDaemon.Server
             */
             return allChannels.size() != 0 ? allChannels.size() - 1 : 0;
         }
+
+        public Map<String, Integer> getConnectedClientsByUser()
+        {
+            Map<String, Integer> result = new HashMap<>();
+            for(Channel c : allChannels)
+            {
+                Connection connection = c.attr(Connection.attributeKey).get();
+                if (connection instanceof ServerConnection)
+                {
+                    ServerConnection conn = (ServerConnection) connection;
+                    String name = conn.getClientState().getUser().getName();
+                    result.put(name, result.getOrDefault(name, 0) + 1);
+                }
+            }
+            return result;
+        }
     }
 
     private static class Initializer extends ChannelInitializer<Channel>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d30bfcaf/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java 
b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 9374ca0..1ebf81c 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 
+import com.codahale.metrics.Counter;
+
 public class ServerConnection extends Connection
 {
     private enum State { UNINITIALIZED, AUTHENTICATION, READY }
@@ -33,6 +35,7 @@ public class ServerConnection extends Connection
     private volatile IAuthenticator.SaslNegotiator saslNegotiator;
     private final ClientState clientState;
     private volatile State state;
+    public final Counter requests = new Counter();
 
     private final ConcurrentMap<Integer, QueryState> queryStates = new 
ConcurrentHashMap<>();
 
@@ -56,6 +59,11 @@ public class ServerConnection extends Connection
         return qState;
     }
 
+    public ClientState getClientState()
+    {
+        return clientState;
+    }
+
     public QueryState validateNewMessage(Message.Type type, ProtocolVersion 
version, int streamId)
     {
         switch (state)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to