Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222954234 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ + private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + + static InetAddressAndPort addr1; + static InetAddressAndPort addr2; + static InetAddressAndPort addr3; + static InetAddressAndPort addr4; + static InetAddressAndPort addr5; + + static Range<Token> range1 = range(0, 1); + static Range<Token> range2 = range(2, 3); + static Range<Token> range3 = range(4, 5); + static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + + @AfterClass + public static void reset() + { + FBUtilities.reset(); + } + + static + { + try + { + addr1 = InetAddressAndPort.getByName("127.0.0.1"); + addr2 = InetAddressAndPort.getByName("127.0.0.2"); + addr3 = InetAddressAndPort.getByName("127.0.0.3"); + addr4 = InetAddressAndPort.getByName("127.0.0.4"); + addr5 = InetAddressAndPort.getByName("127.0.0.5"); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + catch (UnknownHostException e) + { + e.printStackTrace(); + } + } + + @Test + public void testCreateStandardSyncTasks() + { + testCreateStandardSyncTasks(false); + } + + @Test + public void testCreateStandardSyncTasksPullRepair() + { + testCreateStandardSyncTasks(true); + } + + public static void testCreateStandardSyncTasks(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + noTransient(), // transient + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertEquals(2, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertTrue(((LocalSyncTask) task).requestRanges); + Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + + task = tasks.get(pair(addr2, addr3)); + Assert.assertFalse(task.isLocal()); + Assert.assertTrue(task instanceof SymmetricRemoteSyncTask); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + + Assert.assertNull(tasks.get(pair(addr1, addr3))); + } + + @Test + public void testStanardSyncTransient() + { + // Do not stream towards transient nodes + testStanardSyncTransient(true); + testStanardSyncTransient(false); + } + + public void testStanardSyncTransient(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + transientPredicate(addr2), + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertEquals(1, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertTrue(((LocalSyncTask) task).requestRanges); + Assert.assertFalse(((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + } + + @Test + public void testStanardSyncLocalTransient() + { + // Do not stream towards transient nodes + testStanardSyncLocalTransient(true); + testStanardSyncLocalTransient(false); + } + + public void testStanardSyncLocalTransient(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + transientPredicate(addr1), + false, + pullRepair, + PreviewKind.ALL)); + + if (pullRepair) + { + Assert.assertTrue(tasks.isEmpty()); + return; + } + + Assert.assertEquals(1, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertFalse(((LocalSyncTask) task).requestRanges); + Assert.assertTrue(((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + } + + @Test + public void testEmptyDifference() + { + // one of the nodes is a local coordinator + testEmptyDifference(addr1, noTransient(), true); + testEmptyDifference(addr1, noTransient(), false); + testEmptyDifference(addr2, noTransient(), true); + testEmptyDifference(addr2, noTransient(), false); + testEmptyDifference(addr1, transientPredicate(addr1), true); + testEmptyDifference(addr2, transientPredicate(addr1), true); + testEmptyDifference(addr1, transientPredicate(addr1), false); + testEmptyDifference(addr2, transientPredicate(addr1), false); + testEmptyDifference(addr1, transientPredicate(addr2), true); + testEmptyDifference(addr2, transientPredicate(addr2), true); + testEmptyDifference(addr1, transientPredicate(addr2), false); + testEmptyDifference(addr2, transientPredicate(addr2), false); + + // nonlocal coordinator + testEmptyDifference(addr3, noTransient(), true); + testEmptyDifference(addr3, noTransient(), false); + testEmptyDifference(addr3, noTransient(), true); + testEmptyDifference(addr3, noTransient(), false); + testEmptyDifference(addr3, transientPredicate(addr1), true); + testEmptyDifference(addr3, transientPredicate(addr1), true); + testEmptyDifference(addr3, transientPredicate(addr1), false); + testEmptyDifference(addr3, transientPredicate(addr1), false); + testEmptyDifference(addr3, transientPredicate(addr2), true); + testEmptyDifference(addr3, transientPredicate(addr2), true); + testEmptyDifference(addr3, transientPredicate(addr2), false); + testEmptyDifference(addr3, transientPredicate(addr2), false); + } + + public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "same", range2, "same", range3, "same")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + local, // local + isTransient, + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertTrue(tasks.isEmpty()); + } + + @Test + public void testCreateStandardSyncTasksAllDifferent() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + ep -> ep.equals(addr3), // transient + false, + true, + PreviewKind.ALL)); + + Assert.assertEquals(3, tasks.size()); + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + + task = tasks.get(pair(addr2, addr3)); + Assert.assertFalse(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + + task = tasks.get(pair(addr1, addr3)); + Assert.assertTrue(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + } + + @Test + public void testCreate5NodeStandardSyncTasksWithTransient() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + isTransient, // transient + false, + true, + PreviewKind.ALL)); + + SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2), + pair(addr1, addr3), + pair(addr1, addr4), + pair(addr1, addr5), + pair(addr2, addr4), + pair(addr2, addr4), + pair(addr2, addr5), + pair(addr3, addr4), + pair(addr3, addr5)}; + + for (SyncNodePair pair : pairs) + { + SyncTask task = tasks.get(pair); + // Local only if addr1 is a coordinator + assertEquals(task.isLocal(), pair.coordinator.equals(addr1)); + + // Symmetric only if there are no transient participants + assertEquals(String.format("Coordinator: %s\n, Peer: %s\n", + pair.coordinator, + pair.peer), + (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) && + (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)), + task instanceof AsymmetricRemoteSyncTask); + + // All ranges to be synchronised + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + } + } + + @Test + public void testLocalSyncWithTransient() + { + try + { + for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(local.address); + testLocalSyncWithTransient(local, false); + } + } + finally + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + } + + @Test + public void testLocalSyncWithTransientPullRepair() + { + try + { + for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(local.address); + testLocalSyncWithTransient(local, true); + } + } + finally + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + + } + + public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + local, // local + isTransient, // transient + false, + pullRepair, + PreviewKind.ALL)); + + assertEquals(9, tasks.size()); + for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + if (local.equals(addr)) + continue; + + LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr)); + assertTrue(task.requestRanges); + assertEquals(!pullRepair, task.transferRanges); + } + + LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4)); + assertTrue(task.requestRanges); + assertFalse(task.transferRanges); + + task = (LocalSyncTask) tasks.get(pair(local, addr5)); + assertTrue(task.requestRanges); + assertFalse(task.transferRanges); + } + + @Test + public void testLocalAndRemoteTransient() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr4, // local + ep -> ep.equals(addr4) || ep.equals(addr5), // transient + false, + true, + PreviewKind.ALL)); + + assertNull(tasks.get(pair(addr4, addr5))); + } + + @Test + public void testOptimizedCreateStandardSyncTasksAllDifferent() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc, + treeResponses, + addr1, // local + noTransient(), + addr -> "DC1", + false, + PreviewKind.ALL)); + + for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2), + pair(addr1, addr3), + pair(addr2, addr1), + pair(addr2, addr3), + pair(addr3, addr1), + pair(addr3, addr2) }) + { + assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync); + } + } + + @Test + public void testOptimizedCreateStandardSyncTasks() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"), + treeResponse(addr2, range1, "one", range2, "two"), + treeResponse(addr3, range1, "three", range2, "two")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc, + treeResponses, + addr4, // local + noTransient(), + addr -> "DC1", + false, + PreviewKind.ALL)); + + assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync); + assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync); --- End diff -- This invariant does not hold in the current code: `range2` _is_ streamed from both `addr1` and `addr2`, assert below checks for that. This doesn't look right, but I'd have to dig deeper to understand why.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org