This is an automated email from the ASF dual-hosted git repository. djoshi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 311891f Shuffle forwarding replica for messages to non-local DC. 311891f is described below commit 311891f35e538f4a8be0309f6d7045fef59dee71 Author: Jon Meredith <jmeredit...@gmail.com> AuthorDate: Mon Sep 9 10:06:20 2019 -0600 Shuffle forwarding replica for messages to non-local DC. Patch by Jon Meredith, reviewed by Dinesh Joshi for CASSANDRA-15318 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/service/StorageProxy.java | 15 ++++++++++----- .../cassandra/distributed/test/MessageForwardingTest.java | 11 +++++++++++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 53788a7..da57886 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha3 + * Shuffle forwarding replica for messages to non-local DC (CASSANDRA-15318) * Optimise native protocol ASCII string encoding (CASSANDRA-15410) * Make sure all exceptions are propagated in DebuggableThreadPoolExecutor (CASSANDRA-15332) * Make it possible to resize concurrent read / write thread pools at runtime (CASSANDRA-15277) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ded48bf..11c72ec 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1283,16 +1283,17 @@ public class StorageProxy implements StorageProxyMBean /* * Send the message to the first replica of targets, and have it forward the message to others in its DC - * - * TODO: are targets shuffled? do we want them to be to spread out forwarding burden? */ private static void sendMessagesToNonlocalDC(Message<? extends IMutation> message, EndpointsForToken targets, AbstractWriteResponseHandler<IMutation> handler) { + final Replica target; + if (targets.size() > 1) { - EndpointsForToken forwardToReplicas = targets.subList(1, targets.size()); + target = targets.get(ThreadLocalRandom.current().nextInt(0, targets.size())); + EndpointsForToken forwardToReplicas = targets.filter(r -> r != target, targets.size()); for (Replica replica : forwardToReplicas) { @@ -1306,9 +1307,13 @@ public class StorageProxy implements StorageProxyMBean message = message.withForwardTo(new ForwardingInfo(forwardToReplicas.endpointList(), messageIds)); } + else + { + target = targets.get(0); + } - MessagingService.instance().sendWriteWithCallback(message, targets.get(0), handler, true); - logger.trace("Sending message to {}@{}", message.id(), targets.get(0)); + MessagingService.instance().sendWriteWithCallback(message, target, handler, true); + logger.trace("Sending message to {}@{}", message.id(), target); } private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable) diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java index 72928e4..7c3d7a2 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.impl.IsolatedExecutor; import org.apache.cassandra.distributed.impl.TracingUtil; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.UUIDGen; public class MessageForwardingTest extends DistributedTestBase @@ -44,6 +45,7 @@ public class MessageForwardingTest extends DistributedTestBase { String originalTraceTimeout = TracingUtil.setWaitForTracingEventTimeoutSecs("1"); final int numInserts = 100; + Map<InetAddress,Integer> forwardFromCounts = new HashMap<>(); Map<InetAddress,Integer> commitCounts = new HashMap<>(); try (Cluster cluster = init(Cluster.build() @@ -66,6 +68,7 @@ public class MessageForwardingTest extends DistributedTestBase //noinspection ResultOfMethodCallIgnored inserts.map(IsolatedExecutor::waitOn).count(); + cluster.stream("dc1").forEach(instance -> forwardFromCounts.put(instance.broadcastAddressAndPort().address, 0)); cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0)); List<TracingUtil.TraceEntry> traces = TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL); traces.forEach(traceEntry -> { @@ -73,8 +76,16 @@ public class MessageForwardingTest extends DistributedTestBase { commitCounts.compute(traceEntry.source, (k, v) -> (v != null ? v : 0) + 1); } + else if (traceEntry.activity.contains("Enqueuing forwarded write to ")) + { + forwardFromCounts.compute(traceEntry.source, (k, v) -> (v != null ? v : 0) + 1); + } }); + // Check that each node in dc1 was the forwarder at least once. There is a (1/3)^numInserts chance + // that the same node will be picked, but the odds of that are ~2e-48. + forwardFromCounts.forEach((source, count) -> Assert.assertTrue(source + " should have been randomized to forward messages", count > 0)); + // Check that each node received the forwarded messages once (and only once) commitCounts.forEach((source, count) -> Assert.assertEquals(source + " appending to commitlog traces", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org