Repository: cassandra
Updated Branches:
  refs/heads/trunk 685140267 -> a22dd291b


Add metrics to track usage of PreparedStatments

patch by tjake; reviewed by carl for (CASSANDRA-7719)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/42b91d44
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/42b91d44
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/42b91d44

Branch: refs/heads/trunk
Commit: 42b91d4453a3f7ac698c9a871c5d75fcbf8a578f
Parents: dbcc4ea
Author: Jake Luciani <j...@apache.org>
Authored: Wed Sep 10 16:18:32 2014 -0400
Committer: Jake Luciani <j...@apache.org>
Committed: Wed Sep 10 16:18:32 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cql3/QueryProcessor.java   |  67 +++++++----
 .../cassandra/metrics/CqlStatementMetrics.java  |  53 +++++++++
 .../apache/cassandra/service/ClientState.java   |   2 +-
 .../apache/cassandra/cql3/CqlMetricsTest.java   | 110 +++++++++++++++++++
 5 files changed, 213 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60fd4c9..177ac76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Add metrics for tracking PreparedStatement use (CASSANDRA-7719)
  * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
  * (cqlsh): Support for query paging (CASSANDRA-7514)
  * (cqlsh): Show progress of COPY operations (CASSANDRA-7789)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f2bf305..99972a2 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -22,11 +22,14 @@ import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 import org.antlr.runtime.*;
+import org.apache.cassandra.metrics.CqlStatementMetrics;
 import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +59,6 @@ public class QueryProcessor implements QueryHandler
     private static final Logger logger = 
LoggerFactory.getLogger(QueryProcessor.class);
     private static final MemoryMeter meter = new 
MemoryMeter().withGuessing(MemoryMeter.Guess.FALLBACK_BEST);
     private static final long MAX_CACHE_PREPARED_MEMORY = 
Runtime.getRuntime().maxMemory() / 256;
-    private static final int MAX_CACHE_PREPARED_COUNT = 10000;
 
     private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> 
cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
     {
@@ -83,15 +85,34 @@ public class QueryProcessor implements QueryHandler
     // bother with expiration on those.
     private static final ConcurrentMap<String, ParsedStatement.Prepared> 
internalStatements = new ConcurrentHashMap<>();
 
+    @VisibleForTesting
+    public static final CqlStatementMetrics metrics = new 
CqlStatementMetrics();
+
     static
     {
         preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, 
ParsedStatement.Prepared>()
                              
.maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
                              .weigher(cqlMemoryUsageWeigher)
-                             .build();
+                             .listener(new EvictionListener<MD5Digest, 
ParsedStatement.Prepared>()
+                             {
+                                 @Override
+                                 public void onEviction(MD5Digest md5Digest, 
ParsedStatement.Prepared prepared)
+                                 {
+                                     metrics.activePreparedStatements.dec();
+                                 }
+                             }).build();
+
         thriftPreparedStatements = new 
ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>()
                                    
.maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
                                    .weigher(thriftMemoryUsageWeigher)
+                                   .listener(new EvictionListener<Integer, 
CQLStatement>()
+                                   {
+                                       @Override
+                                       public void onEviction(Integer integer, 
CQLStatement cqlStatement)
+                                       {
+                                           
metrics.activePreparedStatements.dec();
+                                       }
+                                   })
                                    .build();
 
     }
@@ -174,7 +195,7 @@ public class QueryProcessor implements QueryHandler
                                                             
Cell.MAX_NAME_LENGTH));
     }
 
-    public static ResultMessage processStatement(CQLStatement statement,
+    private static ResultMessage processStatement(CQLStatement statement,
                                                   QueryState queryState,
                                                   QueryOptions options)
     throws RequestExecutionException, RequestValidationException
@@ -203,6 +224,9 @@ public class QueryProcessor implements QueryHandler
         if (prepared.getBoundTerms() != options.getValues().size())
             throw new InvalidRequestException("Invalid amount of bind 
variables");
 
+        if (!queryState.getClientState().isInternal)
+            metrics.executedUnprepared.inc();
+
         return processStatement(prepared, queryState, options);
     }
 
@@ -370,24 +394,28 @@ public class QueryProcessor implements QueryHandler
             throw new InvalidRequestException(String.format("Prepared 
statement of size %d bytes is larger than allowed maximum of %d bytes.",
                                                             statementSize,
                                                             
MAX_CACHE_PREPARED_MEMORY));
-
-        if (forThrift)
+        try
         {
-            int statementId = toHash.hashCode();
-            thriftPreparedStatements.put(statementId, prepared.statement);
-            logger.trace(String.format("Stored prepared statement #%d with %d 
bind markers",
-                                       statementId,
-                                       prepared.statement.getBoundTerms()));
-            return ResultMessage.Prepared.forThrift(statementId, 
prepared.boundNames);
-        }
-        else
+            if (forThrift)
+            {
+                int statementId = toHash.hashCode();
+                thriftPreparedStatements.put(statementId, prepared.statement);
+                logger.trace("Stored prepared statement #{} with {} bind 
markers",
+                        statementId,
+                        prepared.statement.getBoundTerms());
+                return ResultMessage.Prepared.forThrift(statementId, 
prepared.boundNames);
+            } else
+            {
+                MD5Digest statementId = MD5Digest.compute(toHash);
+                preparedStatements.put(statementId, prepared);
+                logger.trace("Stored prepared statement #{} with {} bind 
markers",
+                        statementId,
+                        prepared.statement.getBoundTerms());
+                return new ResultMessage.Prepared(statementId, prepared);
+            }
+        } finally
         {
-            MD5Digest statementId = MD5Digest.compute(toHash);
-            preparedStatements.put(statementId, prepared);
-            logger.trace(String.format("Stored prepared statement %s with %d 
bind markers",
-                                       statementId,
-                                       prepared.statement.getBoundTerms()));
-            return new ResultMessage.Prepared(statementId, prepared);
+            metrics.activePreparedStatements.inc();
         }
     }
 
@@ -410,6 +438,7 @@ public class QueryProcessor implements QueryHandler
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
+        metrics.executedPrepared.inc();
         return processStatement(statement, queryState, options);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java 
b/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java
new file mode 100644
index 0000000..02b4ad0
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.RatioGauge;
+
+
+public class CqlStatementMetrics
+{
+    private final MetricNameFactory factory = new 
DefaultNameFactory("CqlStatement");
+    public final Counter activePreparedStatements = 
Metrics.newCounter(factory.createMetricName("ActivePreparedStatements"));
+    public final Counter executedPrepared = 
Metrics.newCounter(factory.createMetricName("ExecutedPrepared"));
+    public final Counter executedUnprepared = 
Metrics.newCounter(factory.createMetricName("ExecutedUnPrepared"));
+
+    public final Gauge<Double> preparedRatio = 
Metrics.newGauge(factory.createMetricName("PreparedUnpreparedRatio"), new 
RatioGauge()
+    {
+        protected double getNumerator()
+        {
+            long num = executedPrepared.count();
+            return num == 0 ? 1 : num;
+        }
+
+        protected double getDenominator()
+        {
+            long den = executedUnprepared.count();
+            return den == 0 ? 1 : den;
+        }
+    });
+
+    public void reset()
+    {
+        executedPrepared.clear();
+        executedUnprepared.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java 
b/src/java/org/apache/cassandra/service/ClientState.java
index c0396cb..725690e 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -103,7 +103,7 @@ public class ClientState
 
     // isInternal is used to mark ClientState as used by some internal 
component
     // that should have an ability to modify system keyspace.
-    private final boolean isInternal;
+    public final boolean isInternal;
 
     // The remote address of the client - null for internal clients.
     private final SocketAddress remoteAddress;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java 
b/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java
new file mode 100644
index 0000000..dc9c6a4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class CqlMetricsTest
+{
+
+    private static EmbeddedCassandraService cassandra;
+
+    private static Cluster cluster;
+    private static Session session;
+    private static PreparedStatement metricsStatement;
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = 
Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("drop keyspace if exists junit;");
+        session.execute("create keyspace junit WITH REPLICATION = { 'class' : 
'SimpleStrategy', 'replication_factor' : 1 };");
+        session.execute("CREATE TABLE junit.metricstest (\n" +
+                "  id int PRIMARY KEY,\n" +
+                "  val text\n" +
+                ");");
+    }
+
+    @Test
+    public void testActivePreparedStatements()
+    {
+        assert QueryProcessor.metrics.activePreparedStatements.count() == 0;
+
+        metricsStatement = session.prepare("insert into junit.metricstest(id, 
val)values(?,?)");
+
+        assert QueryProcessor.metrics.activePreparedStatements.count() == 1;
+    }
+
+    @Test
+    public void testExecutedPrepared()
+    {
+        QueryProcessor.metrics.reset();
+
+        assert QueryProcessor.metrics.activePreparedStatements.count() == 1;
+        assert QueryProcessor.metrics.executedPrepared.count() == 0;
+        assert QueryProcessor.metrics.executedUnprepared.count() == 0;
+        assert QueryProcessor.metrics.preparedRatio.value() == 1.0;
+
+        for (int i = 0; i < 10; i++)
+        {
+            session.execute(metricsStatement.bind(i, "val"+i));
+        }
+
+        assert QueryProcessor.metrics.executedPrepared.count() == 10;
+        assert QueryProcessor.metrics.executedUnprepared.count() == 0;
+        assert QueryProcessor.metrics.preparedRatio.value() == 10d/1d;
+
+    }
+
+    @Test
+    public void testExecutedUnPrepared()
+    {
+        QueryProcessor.metrics.reset();
+
+        assert QueryProcessor.metrics.activePreparedStatements.count() == 1;
+        assert QueryProcessor.metrics.executedPrepared.count() == 0;
+        assert QueryProcessor.metrics.executedUnprepared.count() == 0;
+
+        for (int i = 0; i < 10; i++)
+        {
+            session.execute(String.format("insert into junit.metricstest(id, 
val)values(%d,'%s')",i, "val"+1));
+        }
+
+        assert QueryProcessor.metrics.executedPrepared.count() == 0;
+        assert QueryProcessor.metrics.executedUnprepared.count() == 10;
+        assert QueryProcessor.metrics.preparedRatio.value() == 1d/10d;
+    }
+}

Reply via email to