This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47cac5c49b Provide JMX endpoint to allow transient logging of blocking 
read repairs
47cac5c49b is described below

commit 47cac5c49b93d205fa9b3a57ce55470887c5be45
Author: Josh McKenzie <jmcken...@apache.org>
AuthorDate: Tue Mar 22 11:35:36 2022 -0400

    Provide JMX endpoint to allow transient logging of blocking read repairs
    
    Patch by Josh McKenzie; reviewed by David Capwell for CASSANDRA-17471
    
    Co-authored-by: Aleksey Yeschenko <alek...@apache.org>
    Co-aurhoted-by: Josh McKenzie <jmcken...@apache.org>
---
 CHANGES.txt                                            |  1 +
 .../org/apache/cassandra/service/StorageProxy.java     | 18 +++++++++++++++++-
 .../apache/cassandra/service/StorageProxyMBean.java    |  3 +++
 .../cassandra/service/reads/AbstractReadExecutor.java  | 14 +++++++++++++-
 .../org/apache/cassandra/service/StorageProxyTest.java | 18 +++++++++++++++++-
 5 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 377e91b75c..196d2471e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Provide JMX endpoint to allow transient logging of blocking read repairs 
(CASSANDRA-17471)
  * Add guardrail for GROUP BY queries (CASSANDRA-17509)
  * make pylib PEP and pylint compliant (CASSANDRA-17546)
  * Add support for vnodes in jvm-dtest (CASSANDRA-17332)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 15e9e2d467..85c2699cb7 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Iterables;
@@ -194,6 +195,8 @@ public class StorageProxy implements StorageProxyMBean
 
     private static final PartitionDenylist partitionDenylist = new 
PartitionDenylist();
 
+    private volatile long logBlockingReadRepairAttemptsUntilNanos = 
Long.MIN_VALUE;
+
     private StorageProxy()
     {
     }
@@ -2048,9 +2051,10 @@ public class StorageProxy implements StorageProxyMBean
 
         // wait for enough responses to meet the consistency level. If there's 
a digest mismatch, begin the read
         // repair process by sending full data reads to all replicas we 
received responses from.
+        boolean logBlockingRepairAttempts = instance.isLoggingReadRepairs();
         for (int i=0; i<cmdCount; i++)
         {
-            reads[i].awaitResponses();
+            reads[i].awaitResponses(logBlockingRepairAttempts);
         }
 
         // read repair - if it looks like we may not receive enough full data 
responses to meet CL, send
@@ -3019,6 +3023,18 @@ public class StorageProxy implements StorageProxyMBean
         return !partitionDenylist.isKeyPermitted(keyspace, table, bytes);
     }
 
+    @Override
+    public void logBlockingReadRepairAttemptsForNSeconds(int seconds)
+    {
+        logBlockingReadRepairAttemptsUntilNanos = nanoTime() + 
TimeUnit.SECONDS.toNanos(seconds);
+    }
+
+    @Override
+    public boolean isLoggingReadRepairs()
+    {
+        return nanoTime() <= 
StorageProxy.instance.logBlockingReadRepairAttemptsUntilNanos;
+    }
+
     @Override
     public void setPaxosVariant(String variant)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 416a31284a..546143d515 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -91,6 +91,9 @@ public interface StorageProxyMBean
     public String getIdealConsistencyLevel();
     public String setIdealConsistencyLevel(String cl);
 
+    public void logBlockingReadRepairAttemptsForNSeconds(int seconds);
+    public boolean isLoggingReadRepairs();
+
     /**
      * Tracking and reporting of variances in the repaired data set across 
replicas at read time
      */
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 5f73bc696d..b5a759c3dc 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -366,10 +366,15 @@ public abstract class AbstractReadExecutor
         this.result = DuplicateRowChecker.duringRead(result, 
this.replicaPlan.get().readCandidates().endpointList());
     }
 
+    public void awaitResponses() throws ReadTimeoutException
+    {
+        awaitResponses(false);
+    }
+
     /**
      * Wait for the CL to be satisfied by responses
      */
-    public void awaitResponses() throws ReadTimeoutException
+    public void awaitResponses(boolean logBlockingReadRepairAttempt) throws 
ReadTimeoutException
     {
         try
         {
@@ -397,6 +402,13 @@ public abstract class AbstractReadExecutor
         {
             Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
             readRepair.startRepair(digestResolver, this::setResult);
+            if (logBlockingReadRepairAttempt)
+            {
+                logger.info("Blocking Read Repair triggered for query [{}] at 
CL.{} with endpoints {}",
+                            command.toCQLString(),
+                            replicaPlan().consistencyLevel(),
+                            replicaPlan().contacts());
+            }
         }
     }
 
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java 
b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
index 1338cd675f..41742f0c08 100644
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.cassandra.service;
 
+import java.net.UnknownHostException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -101,7 +103,21 @@ public class StorageProxyTest
         });
     }
 
-    private void shouldHintTest(Consumer<Replica> test) throws Exception
+
+    /**
+     * Ensure that the timer backing the JMX endpoint to transiently enable 
blocking read repairs both enables
+     * and disables the way we'd expect.
+     */
+    @Test
+    public void testTransientLoggingTimer()
+    {
+        StorageProxy.instance.logBlockingReadRepairAttemptsForNSeconds(2);
+        Assert.assertTrue(StorageProxy.instance.isLoggingReadRepairs());
+        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+        Assert.assertFalse(StorageProxy.instance.isLoggingReadRepairs());
+    }
+
+    private void shouldHintTest(Consumer<Replica> test) throws 
UnknownHostException
     {
         InetAddressAndPort testEp = 
InetAddressAndPort.getByName("192.168.1.1");
         Replica replica = full(testEp);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to