This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 311891f  Shuffle forwarding replica for messages to non-local DC.
311891f is described below

commit 311891f35e538f4a8be0309f6d7045fef59dee71
Author: Jon Meredith <jmeredit...@gmail.com>
AuthorDate: Mon Sep 9 10:06:20 2019 -0600

    Shuffle forwarding replica for messages to non-local DC.
    
    Patch by Jon Meredith, reviewed by Dinesh Joshi for CASSANDRA-15318
---
 CHANGES.txt                                               |  1 +
 src/java/org/apache/cassandra/service/StorageProxy.java   | 15 ++++++++++-----
 .../cassandra/distributed/test/MessageForwardingTest.java | 11 +++++++++++
 3 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 53788a7..da57886 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Shuffle forwarding replica for messages to non-local DC (CASSANDRA-15318)
  * Optimise native protocol ASCII string encoding (CASSANDRA-15410)
  * Make sure all exceptions are propagated in DebuggableThreadPoolExecutor 
(CASSANDRA-15332)
  * Make it possible to resize concurrent read / write thread pools at runtime 
(CASSANDRA-15277)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ded48bf..11c72ec 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1283,16 +1283,17 @@ public class StorageProxy implements StorageProxyMBean
 
     /*
      * Send the message to the first replica of targets, and have it forward 
the message to others in its DC
-     *
-     * TODO: are targets shuffled? do we want them to be to spread out 
forwarding burden?
      */
     private static void sendMessagesToNonlocalDC(Message<? extends IMutation> 
message,
                                                  EndpointsForToken targets,
                                                  
AbstractWriteResponseHandler<IMutation> handler)
     {
+        final Replica target;
+
         if (targets.size() > 1)
         {
-            EndpointsForToken forwardToReplicas = targets.subList(1, 
targets.size());
+            target = targets.get(ThreadLocalRandom.current().nextInt(0, 
targets.size()));
+            EndpointsForToken forwardToReplicas = targets.filter(r -> r != 
target, targets.size());
 
             for (Replica replica : forwardToReplicas)
             {
@@ -1306,9 +1307,13 @@ public class StorageProxy implements StorageProxyMBean
 
             message = message.withForwardTo(new 
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
         }
+        else
+        {
+            target = targets.get(0);
+        }
 
-        MessagingService.instance().sendWriteWithCallback(message, 
targets.get(0), handler, true);
-        logger.trace("Sending message to {}@{}", message.id(), targets.get(0));
+        MessagingService.instance().sendWriteWithCallback(message, target, 
handler, true);
+        logger.trace("Sending message to {}@{}", message.id(), target);
     }
 
     private static void performLocally(Stage stage, Replica localReplica, 
final Runnable runnable)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
index 72928e4..7c3d7a2 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
 import org.apache.cassandra.distributed.impl.TracingUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.UUIDGen;
 
 public class MessageForwardingTest extends DistributedTestBase
@@ -44,6 +45,7 @@ public class MessageForwardingTest extends DistributedTestBase
     {
         String originalTraceTimeout = 
TracingUtil.setWaitForTracingEventTimeoutSecs("1");
         final int numInserts = 100;
+        Map<InetAddress,Integer> forwardFromCounts = new HashMap<>();
         Map<InetAddress,Integer> commitCounts = new HashMap<>();
 
         try (Cluster cluster = init(Cluster.build()
@@ -66,6 +68,7 @@ public class MessageForwardingTest extends DistributedTestBase
             //noinspection ResultOfMethodCallIgnored
             inserts.map(IsolatedExecutor::waitOn).count();
 
+            cluster.stream("dc1").forEach(instance -> 
forwardFromCounts.put(instance.broadcastAddressAndPort().address, 0));
             cluster.forEach(instance -> 
commitCounts.put(instance.broadcastAddressAndPort().address, 0));
             List<TracingUtil.TraceEntry> traces = 
TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL);
             traces.forEach(traceEntry -> {
@@ -73,8 +76,16 @@ public class MessageForwardingTest extends 
DistributedTestBase
                 {
                     commitCounts.compute(traceEntry.source, (k, v) -> (v != 
null ? v : 0) + 1);
                 }
+                else if (traceEntry.activity.contains("Enqueuing forwarded 
write to "))
+                {
+                    forwardFromCounts.compute(traceEntry.source, (k, v) -> (v 
!= null ? v : 0) + 1);
+                }
             });
 
+            // Check that each node in dc1 was the forwarder at least once.  
There is a (1/3)^numInserts chance
+            // that the same node will be picked, but the odds of that are 
~2e-48.
+            forwardFromCounts.forEach((source, count) -> 
Assert.assertTrue(source + " should have been randomized to forward messages", 
count > 0));
+
             // Check that each node received the forwarded messages once (and 
only once)
             commitCounts.forEach((source, count) ->
                                  Assert.assertEquals(source + " appending to 
commitlog traces",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to