Repository: cassandra Updated Branches: refs/heads/trunk 2886cac38 -> 0841353e9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java new file mode 100644 index 0000000..6e691f5 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LocalSyncTaskTest extends AbstractRepairTest +{ + private static final IPartitioner partitioner = Murmur3Partitioner.instance; + private static final InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); + public static final String KEYSPACE1 = "DifferencerTest"; + public static final String CF_STANDARD = "Standard1"; + public static ColumnFamilyStore cfs; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); + + TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id; + cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + } + + /** + * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference. + */ + @Test + public void testNoDifference() throws Throwable + { + final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.2"); + + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); + + MerkleTrees tree2 = createInitialTree(desc); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(local, tree1); + TreeResponse r2 = new TreeResponse(ep2, tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + task.run(); + + assertEquals(0, task.get().numberOfDifferences); + } + + @Test + public void testDifference() throws Throwable + { + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); + UUID parentRepairSession = UUID.randomUUID(); + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); + + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(), + Arrays.asList(cfs), Arrays.asList(range), false, + ActiveRepairService.UNREPAIRED_SSTABLE, false, + PreviewKind.NONE); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); + MerkleTrees tree2 = createInitialTree(desc); + + // change a range in one of the trees + Token token = partitioner.midpoint(range.left, range.right); + tree1.invalidate(token); + MerkleTree.TreeRange changed = tree1.get(token); + changed.hash("non-empty hash!".getBytes()); + + Set<Range<Token>> interesting = new HashSet<>(); + interesting.add(changed); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(local, tree1); + TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; + DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); + try + { + task.run(); + } + finally + { + DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); + DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3; + } + + // ensure that the changed range was recorded + assertEquals("Wrong differing ranges", interesting.size(), task.stat.numberOfDifferences); + } + + @Test + public void fullRepairStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + + assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); + assertTrue(plan.getFlushBeforeTransfer()); + } + + private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing) + { + StreamCoordinator coordinator = plan.getCoordinator(); + StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions()); + assertEquals(expectedIncoming, session.getNumRequests()); + assertEquals(expectedOutgoing, session.getNumTransfers()); + } + + @Test + public void incrementalRepairStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + + assertEquals(desc.parentSessionId, plan.getPendingRepair()); + assertFalse(plan.getFlushBeforeTransfer()); + assertNumInOut(plan, 1, 1); + } + + /** + * Don't reciprocate streams if the other endpoint is a transient replica + */ + @Test + public void transientRemoteStreamPlan() + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + assertNumInOut(plan, 1, 0); + } + + /** + * Don't request streams if the other endpoint is a transient replica + */ + @Test + public void transientLocalStreamPlan() + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + assertNumInOut(plan, 0, 1); + } + + private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) + { + MerkleTrees tree = new MerkleTrees(partitioner); + tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); + tree.init(); + for (MerkleTree.TreeRange r : tree.invalids()) + { + r.ensureHashInitialised(); + } + return tree; + } + + private MerkleTrees createInitialTree(RepairJobDesc desc) + { + return createInitialTree(desc, partitioner); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java deleted file mode 100644 index 92ae172..0000000 --- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.streaming.StreamCoordinator; -import org.apache.cassandra.streaming.DefaultConnectionFactory; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.MerkleTrees; -import org.apache.cassandra.utils.UUIDGen; - -import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class SymmetricLocalSyncTaskTest extends AbstractRepairTest -{ - private static final IPartitioner partitioner = Murmur3Partitioner.instance; - public static final String KEYSPACE1 = "DifferencerTest"; - public static final String CF_STANDARD = "Standard1"; - public static ColumnFamilyStore cfs; - - @BeforeClass - public static void defineSchema() - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); - - TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id; - cfs = Schema.instance.getColumnFamilyStoreInstance(tid); - } - - /** - * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference. - */ - @Test - public void testNoDifference() throws Throwable - { - final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1"); - final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1"); - - Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - - MerkleTrees tree1 = createInitialTree(desc); - - MerkleTrees tree2 = createInitialTree(desc); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(ep1, tree1); - TreeResponse r2 = new TreeResponse(ep2, tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); - task.run(); - - assertEquals(0, task.get().numberOfDifferences); - } - - @Test - public void testDifference() throws Throwable - { - Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - UUID parentRepairSession = UUID.randomUUID(); - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(), - Arrays.asList(cfs), Arrays.asList(range), false, - ActiveRepairService.UNREPAIRED_SSTABLE, false, - PreviewKind.NONE); - - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - - MerkleTrees tree1 = createInitialTree(desc); - - MerkleTrees tree2 = createInitialTree(desc); - - // change a range in one of the trees - Token token = partitioner.midpoint(range.left, range.right); - tree1.invalidate(token); - MerkleTree.TreeRange changed = tree1.get(token); - changed.hash("non-empty hash!".getBytes()); - - Set<Range<Token>> interesting = new HashSet<>(); - interesting.add(changed); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1); - TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); - DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; - DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); - try - { - task.run(); - } - finally - { - DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); - DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3; - } - - // ensure that the changed range was recorded - assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); - } - - @Test - public void fullRepairStreamPlan() throws Exception - { - UUID sessionID = registerSession(cfs, true, true); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); - RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); - - assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); - assertTrue(plan.getFlushBeforeTransfer()); - } - - private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing) - { - StreamCoordinator coordinator = plan.getCoordinator(); - StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions()); - assertEquals(expectedIncoming, session.getNumRequests()); - assertEquals(expectedOutgoing, session.getNumTransfers()); - } - - @Test - public void incrementalRepairStreamPlan() throws Exception - { - UUID sessionID = registerSession(cfs, true, true); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); - RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); - - assertEquals(desc.parentSessionId, plan.getPendingRepair()); - assertFalse(plan.getFlushBeforeTransfer()); - assertNumInOut(plan, 1, 1); - } - - /** - * Don't reciprocate streams if the other endpoint is a transient replica - */ - @Test - public void transientStreamPlan() - { - UUID sessionID = registerSession(cfs, true, true); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); - RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1)); - assertNumInOut(plan, 1, 0); - } - - private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) - { - MerkleTrees tree = new MerkleTrees(partitioner); - tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); - tree.init(); - for (MerkleTree.TreeRange r : tree.invalids()) - { - r.ensureHashInitialised(); - } - return tree; - } - - private MerkleTrees createInitialTree(RepairJobDesc desc) - { - return createInitialTree(desc, partitioner); - - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java index 06f968f..7f48788 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java @@ -30,18 +30,18 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.UUIDGen; public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest { private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES); private static final List<Range<Token>> RANGE_LIST = ImmutableList.of(RANGE1); - private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask { public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2) { - super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE); + super(DESC, e1, e2, RANGE_LIST, PreviewKind.NONE); } RepairMessage sentMessage = null; @@ -62,7 +62,7 @@ public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest public void normalSync() { InstrumentedSymmetricRemoteSyncTask syncTask = new InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2); - syncTask.startSync(RANGE_LIST); + syncTask.startSync(); Assert.assertNotNull(syncTask.sentMessage); Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java index e57ab94..d583d85 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -29,9 +29,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -45,7 +43,7 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.NodePair; +import org.apache.cassandra.repair.SyncNodePair; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.schema.TableId; @@ -166,7 +164,7 @@ public class RepairMessageSerializationsTest Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)), Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10)) )); - SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true, summaries); + SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries); serializeRoundTrip(msg, SyncComplete.serializer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index c29e7a8..1223683 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -42,7 +42,7 @@ import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.repair.NodePair; +import org.apache.cassandra.repair.SyncNodePair; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.repair.messages.*; @@ -239,7 +239,7 @@ public class SerializationsTest extends AbstractSerializationsTester InetAddressAndPort src = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", PORT); InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT); - NodePair nodes = new NodePair(src, dest); + SyncNodePair nodes = new SyncNodePair(src, dest); try (DataInputStreamPlus in = getInput("service.SyncComplete.bin")) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org