Repository: cassandra
Updated Branches:
  refs/heads/trunk f42e235b1 -> c1a9a47df


Make randompartitioner work with new vnode allocation

patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12647


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1a9a47d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1a9a47d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1a9a47d

Branch: refs/heads/trunk
Commit: c1a9a47df292dbbde3c675c10d68043e7b212c28
Parents: f42e235
Author: Dikang Gu <dikan...@gmail.com>
Authored: Wed Sep 14 23:04:14 2016 -0700
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Sep 19 16:36:34 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/dht/RandomPartitioner.java | 13 +++
 .../ReplicationAwareTokenAllocatorTest.java     | 84 +++++++++++++++-----
 3 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8e39d95..b625a58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
  * Fix cassandra-stress graphing (CASSANDRA-12237)
  * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)
  * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java 
b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index c063be3..7c8f6ac 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -177,6 +177,19 @@ public class RandomPartitioner implements IPartitioner
         {
             return HEAP_SIZE;
         }
+
+        public Token increaseSlightly()
+        {
+            return new BigIntegerToken(token.add(BigInteger.ONE));
+        }
+
+        public double size(Token next)
+        {
+            BigIntegerToken n = (BigIntegerToken) next;
+            BigInteger v = n.token.subtract(token);  // Overflow acceptable 
and desired.
+            double d = Math.scalb(v.doubleValue(), -127); // Scale so that the 
full range is 1.
+            return d > 0.0 ? d : (d + 1.0); // Adjust for signed long, also 
making sure t.size(t) == 1.
+        }
     }
 
     public BigIntegerToken getToken(ByteBuffer key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
index 1b36c55..482e2ac 100644
--- 
a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
@@ -30,7 +30,9 @@ import 
org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.junit.Test;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
 
 public class ReplicationAwareTokenAllocatorTest
@@ -489,10 +491,10 @@ public class ReplicationAwareTokenAllocatorTest
         }
     };
 
-    Murmur3Partitioner partitioner = new Murmur3Partitioner();
     Random seededRand = new Random(2);
 
-    private void random(Map<Token, Unit> map, TestReplicationStrategy rs, int 
unitCount, TokenCount tc, int perUnitCount)
+    private void random(Map<Token, Unit> map, TestReplicationStrategy rs,
+                        int unitCount, TokenCount tc, int perUnitCount, 
IPartitioner partitioner)
     {
         System.out.format("\nRandom generation of %d units with %d tokens 
each\n", unitCount, perUnitCount);
         Random rand = seededRand;
@@ -509,49 +511,82 @@ public class ReplicationAwareTokenAllocatorTest
     }
 
     @Test
-    public void testExistingCluster()
+    public void testExistingClusterWithRandomPartitioner()
+    {
+        testExistingCluster(new RandomPartitioner());
+    }
+
+    @Test
+    public void testExistingClusterWithMurmur3Partitioner()
+    {
+        testExistingCluster(new Murmur3Partitioner());
+    }
+
+    public void testExistingCluster(IPartitioner partitioner)
     {
         for (int rf = 1; rf <= 5; ++rf)
         {
             for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
             {
-                testExistingCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf));
-                testExistingCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf));
+                testExistingCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                testExistingCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
                 if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
                 for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
                 {
-                    testExistingCluster(perUnitCount, fixedTokenCount, new 
BalancedGroupReplicationStrategy(rf, groupSize));
-                    testExistingCluster(perUnitCount, varyingTokenCount, new 
UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, 
seededRand));
+                    testExistingCluster(perUnitCount, fixedTokenCount,
+                                        new 
BalancedGroupReplicationStrategy(rf, groupSize), partitioner);
+                    testExistingCluster(perUnitCount, varyingTokenCount,
+                                        new 
UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, 
seededRand),
+                                        partitioner);
                 }
-                testExistingCluster(perUnitCount, fixedTokenCount, new 
FixedGroupCountReplicationStrategy(rf, rf * 2));
+                testExistingCluster(perUnitCount, fixedTokenCount,
+                                    new FixedGroupCountReplicationStrategy(rf, 
rf * 2), partitioner);
             }
         }
     }
 
-    public void testExistingCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs)
+    public void testExistingCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
     {
         System.out.println("Testing existing cluster, target " + perUnitCount 
+ " vnodes, replication " + rs);
         final int targetClusterSize = TARGET_CLUSTER_SIZE;
         NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
 
-        random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount);
+        random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, 
partitioner);
 
         ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
         grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false);
         grow(t, targetClusterSize, tc, perUnitCount, true);
-        loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount);
+        loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, 
partitioner);
         System.out.println();
     }
 
     @Test
-    public void testNewCluster()
+    public void testNewClusterWithRandomPartitioner()
     {
-        Util.flakyTest(this::flakyTestNewCluster,
+        Util.flakyTest(this::flakyTestNewClusterWithRandomPartitioner,
                        5,
                        "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
     }
 
-    public void flakyTestNewCluster()
+    @Test
+    public void testNewClusterWithMurmur3Partitioner()
+    {
+        Util.flakyTest(this::flakyTestNewClusterWithMurmur3Partitioner,
+                       5,
+                       "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
+    }
+
+    public void flakyTestNewClusterWithRandomPartitioner()
+    {
+        flakyTestNewCluster(new RandomPartitioner());
+    }
+
+    public void flakyTestNewClusterWithMurmur3Partitioner()
+    {
+        flakyTestNewCluster(new Murmur3Partitioner());
+    }
+
+    public void flakyTestNewCluster(IPartitioner partitioner)
     {
         // This test is flaky because the selection of the tokens for the 
first RF nodes (which is random, with an
         // uncontrolled seed) can sometimes cause a pathological situation 
where the algorithm will find a (close to)
@@ -564,20 +599,24 @@ public class ReplicationAwareTokenAllocatorTest
         {
             for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
             {
-                testNewCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf));
-                testNewCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf));
+                testNewCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                testNewCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
                 if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
                 for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
                 {
-                    testNewCluster(perUnitCount, fixedTokenCount, new 
BalancedGroupReplicationStrategy(rf, groupSize));
-                    testNewCluster(perUnitCount, varyingTokenCount, new 
UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, 
seededRand));
+                    testNewCluster(perUnitCount, fixedTokenCount,
+                                   new BalancedGroupReplicationStrategy(rf, 
groupSize), partitioner);
+                    testNewCluster(perUnitCount, varyingTokenCount,
+                                   new UnbalancedGroupReplicationStrategy(rf, 
groupSize / 2, groupSize * 2, seededRand),
+                                   partitioner);
                 }
-                testNewCluster(perUnitCount, fixedTokenCount, new 
FixedGroupCountReplicationStrategy(rf, rf * 2));
+                testNewCluster(perUnitCount, fixedTokenCount,
+                               new FixedGroupCountReplicationStrategy(rf, rf * 
2), partitioner);
             }
         }
     }
 
-    public void testNewCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs)
+    public void testNewCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
     {
         System.out.println("Testing new cluster, target " + perUnitCount + " 
vnodes, replication " + rs);
         final int targetClusterSize = TARGET_CLUSTER_SIZE;
@@ -586,11 +625,12 @@ public class ReplicationAwareTokenAllocatorTest
         ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
         grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false);
         grow(t, targetClusterSize, tc, perUnitCount, true);
-        loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount);
+        loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, 
partitioner);
         System.out.println();
     }
 
-    private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int 
howMany, TokenCount tc, int perUnitCount)
+    private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int 
howMany,
+                                TokenCount tc, int perUnitCount, IPartitioner 
partitioner)
     {
         int fullCount = t.unitCount();
         System.out.format("Losing %d units. ", howMany);

Reply via email to