http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/StorageServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java new file mode 100644 index 0000000..9d5c324 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractEndpointSnitch; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaMultimap; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; + +import static org.junit.Assert.assertEquals; + +public class StorageServiceTest +{ + static InetAddressAndPort aAddress; + static InetAddressAndPort bAddress; + static InetAddressAndPort cAddress; + static InetAddressAndPort dAddress; + static InetAddressAndPort eAddress; + + @BeforeClass + public static void setUpClass() throws Exception + { + aAddress = InetAddressAndPort.getByName("127.0.0.1"); + bAddress = InetAddressAndPort.getByName("127.0.0.2"); + cAddress = InetAddressAndPort.getByName("127.0.0.3"); + dAddress = InetAddressAndPort.getByName("127.0.0.4"); + eAddress = InetAddressAndPort.getByName("127.0.0.5"); + } + + private static final Token threeToken = new RandomPartitioner.BigIntegerToken("3"); + private static final Token sixToken = new RandomPartitioner.BigIntegerToken("6"); + private static final Token nineToken = new RandomPartitioner.BigIntegerToken("9"); + private static final Token elevenToken = new RandomPartitioner.BigIntegerToken("11"); + private static final Token oneToken = new RandomPartitioner.BigIntegerToken("1"); + + Range<Token> aRange = new Range<>(oneToken, threeToken); + Range<Token> bRange = new Range<>(threeToken, sixToken); + Range<Token> cRange = new Range<>(sixToken, nineToken); + Range<Token> dRange = new Range<>(nineToken, elevenToken); + Range<Token> eRange = new Range<>(elevenToken, oneToken); + + @Before + public void setUp() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + IEndpointSnitch snitch = new AbstractEndpointSnitch() + { + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2) + { + return 0; + } + + public String getRack(InetAddressAndPort endpoint) + { + return "R1"; + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + return "DC1"; + } + }; + + DatabaseDescriptor.setEndpointSnitch(snitch); + } + + private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd) + { + return new SimpleStrategy("MoveTransientTest", + tmd, + DatabaseDescriptor.getEndpointSnitch(), + com.google.common.collect.ImmutableMap.of("replication_factor", "3/1")); + } + + public static <K, C extends ReplicaCollection<? extends C>> void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b) + { + if (!a.keySet().equals(b.keySet())) + assertEquals(a, b); + for (K key : a.keySet()) + { + C ac = a.get(key); + C bc = b.get(key); + if (ac.size() != bc.size()) + assertEquals(a, b); + for (Replica r : ac) + { + if (!bc.contains(r)) + assertEquals(a, b); + } + } + } + + @Test + public void testGetChangedReplicasForLeaving() throws Exception + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(threeToken, aAddress); + tmd.updateNormalToken(sixToken, bAddress); + tmd.updateNormalToken(nineToken, cAddress); + tmd.updateNormalToken(elevenToken, dAddress); + tmd.updateNormalToken(oneToken, eAddress); + + tmd.addLeavingEndpoint(aAddress); + + AbstractReplicationStrategy strat = simpleStrategy(tmd); + + EndpointsByReplica result = StorageService.getChangedReplicasForLeaving("StorageServiceTest", aAddress, tmd, strat); + System.out.println(result); + EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + expectedResult.put(new Replica(aAddress, aRange, true), new Replica(cAddress, new Range<>(oneToken, sixToken), true)); + expectedResult.put(new Replica(aAddress, aRange, true), new Replica(dAddress, new Range<>(oneToken, sixToken), false)); + expectedResult.put(new Replica(aAddress, eRange, true), new Replica(bAddress, eRange, true)); + expectedResult.put(new Replica(aAddress, eRange, true), new Replica(cAddress, eRange, false)); + expectedResult.put(new Replica(aAddress, dRange, false), new Replica(bAddress, dRange, false)); + assertMultimapEqualsIgnoreOrder(result, expectedResult.asImmutableView()); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java index f8567e8..cf1e06a 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java @@ -20,12 +20,14 @@ package org.apache.cassandra.service; import java.net.InetAddress; -import java.util.Collection; -import java.util.List; +import java.net.UnknownHostException; import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.google.common.collect.ImmutableList; +import com.google.common.base.Predicates; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaLayout; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -38,9 +40,13 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -49,12 +55,26 @@ public class WriteResponseHandlerTest { static Keyspace ks; static ColumnFamilyStore cfs; - static List<InetAddressAndPort> targets; + static EndpointsForToken targets; + static EndpointsForToken pending; + + private static Replica full(String name) + { + try + { + return ReplicaUtils.full(InetAddressAndPort.getByName(name)); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } @BeforeClass public static void setUpClass() throws Throwable { SchemaLoader.loadSchema(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); // Register peers with expected DC for NetworkTopologyStrategy. TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); @@ -77,17 +97,12 @@ public class WriteResponseHandlerTest return "datacenter2"; } - public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress) - { - return null; - } - - public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses) + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C replicas) { - + return replicas; } - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } @@ -97,7 +112,7 @@ public class WriteResponseHandlerTest } - public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) + public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2) { return false; } @@ -106,8 +121,10 @@ public class WriteResponseHandlerTest SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar")); ks = Keyspace.open("Foo"); cfs = ks.getColumnFamilyStore("Bar"); - targets = ImmutableList.of(InetAddressAndPort.getByName("127.1.0.255"), InetAddressAndPort.getByName("127.1.0.254"), InetAddressAndPort.getByName("127.1.0.253"), - InetAddressAndPort.getByName("127.2.0.255"), InetAddressAndPort.getByName("127.2.0.254"), InetAddressAndPort.getByName("127.2.0.253")); + targets = EndpointsForToken.of(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)), + full("127.1.0.255"), full("127.1.0.254"), full("127.1.0.253"), + full("127.2.0.255"), full("127.2.0.254"), full("127.2.0.253")); + pending = EndpointsForToken.empty(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0))); } @Before @@ -197,7 +214,6 @@ public class WriteResponseHandlerTest @Test public void failedIdealCLIncrementsStat() throws Throwable { - AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM); //Succeed in local DC @@ -220,16 +236,12 @@ public class WriteResponseHandlerTest private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime) { - return ks.getReplicationStrategy().getWriteResponseHandler(targets, ImmutableList.of(), cl, new Runnable() { - public void run() - { - - } - }, WriteType.SIMPLE, queryStartTime, ideal); + return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending), + null, WriteType.SIMPLE, queryStartTime, ideal); } private static MessageIn createDummyMessage(int target) { - return MessageIn.create(targets.get(target), null, null, null, 0, 0L); + return MessageIn.create(targets.get(target).endpoint(), null, null, null, 0, 0L); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java new file mode 100644 index 0000000..c19e65e --- /dev/null +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.base.Predicates; +import com.google.common.collect.Sets; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.ReplicaUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE; +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.apache.cassandra.locator.ReplicaUtils.trans; + +public class WriteResponseHandlerTransientTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + + static final InetAddressAndPort EP1; + static final InetAddressAndPort EP2; + static final InetAddressAndPort EP3; + static final InetAddressAndPort EP4; + static final InetAddressAndPort EP5; + static final InetAddressAndPort EP6; + + static final String DC1 = "datacenter1"; + static final String DC2 = "datacenter2"; + static Token dummy; + static + { + try + { + EP1 = InetAddressAndPort.getByName("127.1.0.1"); + EP2 = InetAddressAndPort.getByName("127.1.0.2"); + EP3 = InetAddressAndPort.getByName("127.1.0.3"); + EP4 = InetAddressAndPort.getByName("127.2.0.4"); + EP5 = InetAddressAndPort.getByName("127.2.0.5"); + EP6 = InetAddressAndPort.getByName("127.2.0.6"); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } + + @BeforeClass + public static void setupClass() throws Throwable + { + SchemaLoader.loadSchema(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + // Register peers with expected DC for NetworkTopologyStrategy. + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.1")); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.2.0.1")); + + DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch() + { + public String getRack(InetAddressAndPort endpoint) + { + return null; + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + byte[] address = endpoint.address.getAddress(); + if (address[1] == 1) + return DC1; + else + return DC2; + } + + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C unsortedAddress) + { + return unsortedAddress; + } + + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) + { + return 0; + } + + public void gossiperStarting() + { + + } + + public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2) + { + return false; + } + }); + + DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.1.0.1")); + SchemaLoader.createKeyspace("ks", KeyspaceParams.nts(DC1, "3/1", DC2, "3/1"), SchemaLoader.standardCFMD("ks", "tbl")); + ks = Keyspace.open("ks"); + cfs = ks.getColumnFamilyStore("tbl"); + dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)); + } + + @Ignore("Throws unavailable for quorum as written") + @Test + public void checkPendingReplicasAreNotFiltered() + { + EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3)); + EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6)); + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue()); + + Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending()); + } + + private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected) + { + return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected); + } + + private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate) + { + return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate); + } + + private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate) + { + ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate); + Assert.assertEquals(expected.natural(), actual.natural()); + Assert.assertEquals(expected.selected(), actual.selected()); + } + + private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints) + { + Set<InetAddressAndPort> deadSet = Sets.newHashSet(endpoints); + return ep -> !deadSet.contains(ep); + } + + private static EndpointsForToken replicas(Replica... rr) + { + return EndpointsForToken.of(dummy.getToken(), rr); + } + + @Ignore("Throws unavailable for quorum as written") + @Test + public void checkSpeculationContext() + { + EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3)); + // in happy path, transient replica should be classified as a backup + assertSpeculationReplicas(expected(all, + replicas(full(EP1), full(EP2))), + replicas(full(EP1), full(EP2), trans(EP3)), + 2, dead()); + + // if one of the full replicas is dead, they should all be in the initial contacts + assertSpeculationReplicas(expected(all, + replicas(full(EP1), trans(EP3))), + replicas(full(EP1), full(EP2), trans(EP3)), + 2, dead(EP2)); + + // block only for 1 full replica, use transient as backups + assertSpeculationReplicas(expected(all, + replicas(full(EP1))), + replicas(full(EP1), full(EP2), trans(EP3)), + 1, dead(EP2)); + } + + @Test (expected = UnavailableException.class) + public void noFullReplicas() + { + getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1)); + } + + @Test (expected = UnavailableException.class) + public void notEnoughTransientReplicas() + { + getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java new file mode 100644 index 0000000..c6f2232 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringBoundary; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Base class for testing various components which deal with read responses + */ +@Ignore +public abstract class AbstractReadResponseTest +{ + public static final String KEYSPACE1 = "DataResolverTest"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_COLLECTION = "Collection1"; + + public static Keyspace ks; + public static ColumnFamilyStore cfs; + public static ColumnFamilyStore cfs2; + public static TableMetadata cfm; + public static TableMetadata cfm2; + public static ColumnMetadata m; + + public static DecoratedKey dk; + static int nowInSec; + + static final InetAddressAndPort EP1; + static final InetAddressAndPort EP2; + static final InetAddressAndPort EP3; + + static + { + try + { + EP1 = InetAddressAndPort.getByName("127.0.0.1"); + EP2 = InetAddressAndPort.getByName("127.0.0.2"); + EP3 = InetAddressAndPort.getByName("127.0.0.3"); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @BeforeClass + public static void setupClass() throws Throwable + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + TableMetadata.Builder builder1 = + TableMetadata.builder(KEYSPACE1, CF_STANDARD) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col1", AsciiType.instance) + .addRegularColumn("c1", AsciiType.instance) + .addRegularColumn("c2", AsciiType.instance) + .addRegularColumn("one", AsciiType.instance) + .addRegularColumn("two", AsciiType.instance); + + TableMetadata.Builder builder2 = + TableMetadata.builder(KEYSPACE1, CF_COLLECTION) + .addPartitionKeyColumn("k", ByteType.instance) + .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2); + + ks = Keyspace.open(KEYSPACE1); + cfs = ks.getColumnFamilyStore(CF_STANDARD); + cfm = cfs.metadata(); + cfs2 = ks.getColumnFamilyStore(CF_COLLECTION); + cfm2 = cfs2.metadata(); + m = cfm2.getColumn(new ColumnIdentifier("m", false)); + } + + @Before + public void setUp() throws Exception + { + dk = Util.dk("key1"); + nowInSec = FBUtilities.nowInSeconds(); + } + + static void assertPartitionsEqual(RowIterator l, RowIterator r) + { + try (RowIterator left = l; RowIterator right = r) + { + Assert.assertTrue(Util.sameContent(left, right)); + } + } + + static void assertPartitionsEqual(UnfilteredRowIterator left, UnfilteredRowIterator right) + { + Assert.assertTrue(Util.sameContent(left, right)); + } + + static void assertPartitionsEqual(UnfilteredPartitionIterator left, UnfilteredPartitionIterator right) + { + while (left.hasNext()) + { + Assert.assertTrue(right.hasNext()); + assertPartitionsEqual(left.next(), right.next()); + } + Assert.assertFalse(right.hasNext()); + } + + static void assertPartitionsEqual(PartitionIterator l, PartitionIterator r) + { + try (PartitionIterator left = l; PartitionIterator right = r) + { + while (left.hasNext()) + { + Assert.assertTrue(right.hasNext()); + assertPartitionsEqual(left.next(), right.next()); + } + Assert.assertFalse(right.hasNext()); + } + } + + static void consume(PartitionIterator i) + { + try (PartitionIterator iterator = i) + { + while (iterator.hasNext()) + { + try (RowIterator rows = iterator.next()) + { + while (rows.hasNext()) + rows.next(); + } + } + } + } + + static PartitionIterator filter(UnfilteredPartitionIterator iter) + { + return UnfilteredPartitionIterators.filter(iter, nowInSec); + } + + static DecoratedKey dk(String k) + { + return cfs.decorateKey(ByteBufferUtil.bytes(k)); + } + + static DecoratedKey dk(int k) + { + return dk(Integer.toString(k)); + } + + static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest) + { + ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command); + return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version); + } + + static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data) + { + return response(command, from, data, false); + } + + public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime) + { + return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime); + } + + public RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime) + { + ClusteringBound startBound = rtBound(start, true, inclusiveStart); + ClusteringBound endBound = rtBound(end, false, inclusiveEnd); + return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime)); + } + + public ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive) + { + ClusteringBound.Kind kind = isStart + ? (inclusive ? ClusteringPrefix.Kind.INCL_START_BOUND : ClusteringPrefix.Kind.EXCL_START_BOUND) + : (inclusive ? ClusteringPrefix.Kind.INCL_END_BOUND : ClusteringPrefix.Kind.EXCL_END_BOUND); + + return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues()); + } + + public ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd) + { + ClusteringBound.Kind kind = inclusiveOnEnd + ? ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY + : ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY; + return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues()); + } + + public RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime) + { + return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime)); + } + + public RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2) + { + return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd), + new DeletionTime(markedForDeleteAt1, localDeletionTime1), + new DeletionTime(markedForDeleteAt2, localDeletionTime2)); + } + + public UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec) + { + return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator()); + } + + public UnfilteredPartitionIterator iter(PartitionUpdate update) + { + return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator()); + } + + public UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds) + { + SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator); + Collections.addAll(s, unfiltereds); + final Iterator<Unfiltered> iterator = s.iterator(); + + UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm, + key, + DeletionTime.LIVE, + cfm.regularAndStaticColumns(), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS) + { + protected Unfiltered computeNext() + { + return iterator.hasNext() ? iterator.next() : endOfData(); + } + }; + return new SingletonUnfilteredPartitionIterator(rowIter); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index ac8ed0b..abec25d 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -19,114 +19,101 @@ package org.apache.cassandra.service.reads; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collections; +import java.util.Iterator; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; -import org.junit.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.MutableDeletionInfo; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.ByteType; -import org.apache.cassandra.db.marshal.IntegerType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.net.*; -import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.service.reads.repair.TestableReadRepair; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.Util.assertClustering; import static org.apache.cassandra.Util.assertColumn; import static org.apache.cassandra.Util.assertColumns; +import static org.apache.cassandra.db.ClusteringBound.Kind; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.apache.cassandra.db.ClusteringBound.Kind; -public class DataResolverTest +public class DataResolverTest extends AbstractReadResponseTest { public static final String KEYSPACE1 = "DataResolverTest"; public static final String CF_STANDARD = "Standard1"; - public static final String CF_COLLECTION = "Collection1"; - - // counter to generate the last byte of the respondent's address in a ReadResponse message - private int addressSuffix = 10; - - private DecoratedKey dk; - private Keyspace ks; - private ColumnFamilyStore cfs; - private ColumnFamilyStore cfs2; - private TableMetadata cfm; - private TableMetadata cfm2; - private ColumnMetadata m; - private int nowInSec; + private ReadCommand command; private TestableReadRepair readRepair; - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - DatabaseDescriptor.daemonInitialization(); - - TableMetadata.Builder builder1 = - TableMetadata.builder(KEYSPACE1, CF_STANDARD) - .addPartitionKeyColumn("key", BytesType.instance) - .addClusteringColumn("col1", AsciiType.instance) - .addRegularColumn("c1", AsciiType.instance) - .addRegularColumn("c2", AsciiType.instance) - .addRegularColumn("one", AsciiType.instance) - .addRegularColumn("two", AsciiType.instance); - - TableMetadata.Builder builder2 = - TableMetadata.builder(KEYSPACE1, CF_COLLECTION) - .addPartitionKeyColumn("k", ByteType.instance) - .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)); - - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2); - } - @Before public void setup() { - dk = Util.dk("key1"); - ks = Keyspace.open(KEYSPACE1); - cfs = ks.getColumnFamilyStore(CF_STANDARD); - cfm = cfs.metadata(); - cfs2 = ks.getColumnFamilyStore(CF_COLLECTION); - cfm2 = cfs2.metadata(); - m = cfm2.getColumn(new ColumnIdentifier("m", false)); - - nowInSec = FBUtilities.nowInSeconds(); command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); } - @Test - public void testResolveNewerSingleRow() throws UnknownHostException + private static EndpointsForRange makeReplicas(int num) { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") - .add("c1", "v1") - .buildUpdate()))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") - .add("c1", "v2") - .buildUpdate()))); + EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num); + for (int i = 0; i < num; i++) + { + try + { + replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) }))); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } + return replicas.build(); + } + + @Test + public void testResolveNewerSingleRow() + { + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") + .add("c1", "v1") + .buildUpdate()), false)); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") + .add("c1", "v2") + .buildUpdate()), false)); try(PartitionIterator data = resolver.resolve()) { @@ -149,16 +136,17 @@ public class DataResolverTest @Test public void testResolveDisjointSingleRow() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") - .add("c1", "v1") - .buildUpdate()))); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") + .add("c1", "v1") + .buildUpdate()))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") - .add("c2", "v2") - .buildUpdate()))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") + .add("c2", "v2") + .buildUpdate()))); try(PartitionIterator data = resolver.resolve()) { @@ -185,15 +173,16 @@ public class DataResolverTest @Test public void testResolveDisjointMultipleRows() throws UnknownHostException { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") - .add("c1", "v1") - .buildUpdate()))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2") - .add("c2", "v2") - .buildUpdate()))); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") + .add("c1", "v1") + .buildUpdate()))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2") + .add("c2", "v2") + .buildUpdate()))); try (PartitionIterator data = resolver.resolve()) { @@ -231,37 +220,38 @@ public class DataResolverTest @Test public void testResolveDisjointMultipleRowsWithRangeTombstones() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair); + EndpointsForRange replicas = makeReplicas(4); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec); RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec); PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1) - .addRangeTombstone(tombstone2) - .buildUpdate(); + .addRangeTombstone(tombstone2) + .buildUpdate(); - InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1) - .addRangeTombstone(tombstone2) - .buildUpdate()); - resolver.preprocess(readResponseMessage(peer1, iter1)); + .addRangeTombstone(tombstone2) + .buildUpdate()); + resolver.preprocess(response(command, peer1, iter1)); // not covered by any range tombstone - InetAddressAndPort peer2 = peer(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0") - .add("c1", "v0") - .buildUpdate()); - resolver.preprocess(readResponseMessage(peer2, iter2)); + .add("c1", "v0") + .buildUpdate()); + resolver.preprocess(response(command, peer2, iter2)); // covered by a range tombstone - InetAddressAndPort peer3 = peer(); + InetAddressAndPort peer3 = replicas.get(2).endpoint(); UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10") - .add("c2", "v1") - .buildUpdate()); - resolver.preprocess(readResponseMessage(peer3, iter3)); + .add("c2", "v1") + .buildUpdate()); + resolver.preprocess(response(command, peer3, iter3)); // range covered by rt, but newer - InetAddressAndPort peer4 = peer(); + InetAddressAndPort peer4 = replicas.get(3).endpoint(); UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3") - .add("one", "A") - .buildUpdate()); - resolver.preprocess(readResponseMessage(peer4, iter4)); + .add("one", "A") + .buildUpdate()); + resolver.preprocess(response(command, peer4, iter4)); try (PartitionIterator data = resolver.resolve()) { try (RowIterator rows = data.next()) @@ -311,13 +301,14 @@ public class DataResolverTest @Test public void testResolveWithOneEmpty() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") - .add("c2", "v2") - .buildUpdate()))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm))); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") + .add("c2", "v2") + .buildUpdate()))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, EmptyIterators.unfilteredPartition(cfm))); try(PartitionIterator data = resolver.resolve()) { @@ -340,10 +331,11 @@ public class DataResolverTest @Test public void testResolveWithBothEmpty() { + EndpointsForRange replicas = makeReplicas(2); TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm))); - resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm))); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm))); + resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm))); try(PartitionIterator data = resolver.resolve()) { @@ -356,14 +348,15 @@ public class DataResolverTest @Test public void testResolveDeleted() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); // one response with columns timestamped before a delete in another response - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") - .add("one", "A") - .buildUpdate()))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec))); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1") + .add("one", "A") + .buildUpdate()))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, fullPartitionDelete(cfm, dk, 1, nowInSec))); try (PartitionIterator data = resolver.resolve()) { @@ -381,23 +374,24 @@ public class DataResolverTest @Test public void testResolveMultipleDeleted() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair); + EndpointsForRange replicas = makeReplicas(4); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); // deletes and columns with interleaved timestamp, with out of order return sequence - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec))); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(command, peer1, fullPartitionDelete(cfm, dk, 0, nowInSec))); // these columns created after the previous deletion - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") - .add("one", "A") - .add("two", "A") - .buildUpdate()))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") + .add("one", "A") + .add("two", "A") + .buildUpdate()))); //this column created after the next delete - InetAddressAndPort peer3 = peer(); - resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1") - .add("two", "B") - .buildUpdate()))); - InetAddressAndPort peer4 = peer(); - resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec))); + InetAddressAndPort peer3 = replicas.get(2).endpoint(); + resolver.preprocess(response(command, peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1") + .add("two", "B") + .buildUpdate()))); + InetAddressAndPort peer4 = replicas.get(3).endpoint(); + resolver.preprocess(response(command, peer4, fullPartitionDelete(cfm, dk, 2, nowInSec))); try(PartitionIterator data = resolver.resolve()) { @@ -465,9 +459,10 @@ public class DataResolverTest */ private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2) { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - InetAddressAndPort peer2 = peer(); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); // 1st "stream" RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec); @@ -485,8 +480,8 @@ public class DataResolverTest .addRangeTombstone(four_five) .buildUpdate()); - resolver.preprocess(readResponseMessage(peer1, iter1)); - resolver.preprocess(readResponseMessage(peer2, iter2)); + resolver.preprocess(response(command, peer1, iter1)); + resolver.preprocess(response(command, peer2, iter2)); // No results, we've only reconciled tombstones. try (PartitionIterator data = resolver.resolve()) @@ -538,9 +533,10 @@ public class DataResolverTest */ private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - InetAddressAndPort peer2 = peer(); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); // 1st "stream" RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec); @@ -554,8 +550,8 @@ public class DataResolverTest RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec); UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine); - resolver.preprocess(readResponseMessage(peer1, iter1)); - resolver.preprocess(readResponseMessage(peer2, iter2)); + resolver.preprocess(response(command, peer1, iter1)); + resolver.preprocess(response(command, peer2, iter2)); boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3; @@ -589,9 +585,10 @@ public class DataResolverTest @Test public void testRepairRangeTombstoneWithPartitionDeletion() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - InetAddressAndPort peer2 = peer(); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); // 1st "stream": just a partition deletion UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec)); @@ -602,8 +599,8 @@ public class DataResolverTest .addRangeTombstone(rt) .buildUpdate()); - resolver.preprocess(readResponseMessage(peer1, iter1)); - resolver.preprocess(readResponseMessage(peer2, iter2)); + resolver.preprocess(response(command, peer1, iter1)); + resolver.preprocess(response(command, peer2, iter2)); // No results, we've only reconciled tombstones. try (PartitionIterator data = resolver.resolve()) @@ -627,15 +624,16 @@ public class DataResolverTest @Test public void testRepairRangeTombstoneWithPartitionDeletion2() { - DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); - InetAddressAndPort peer1 = peer(); - InetAddressAndPort peer2 = peer(); + EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); // 1st "stream": a partition deletion and a range tombstone RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec); PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk) - .addRangeTombstone(rt1) - .buildUpdate(); + .addRangeTombstone(rt1) + .buildUpdate(); ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec)); UnfilteredPartitionIterator iter1 = iter(upd1); @@ -647,8 +645,8 @@ public class DataResolverTest .addRangeTombstone(rt3) .buildUpdate()); - resolver.preprocess(readResponseMessage(peer1, iter1)); - resolver.preprocess(readResponseMessage(peer2, iter2)); + resolver.preprocess(response(command, peer1, iter1)); + resolver.preprocess(response(command, peer2, iter2)); // No results, we've only reconciled tombstones. try (PartitionIterator data = resolver.resolve()) @@ -678,8 +676,8 @@ public class DataResolverTest Slice slice = rt.deletedSlice(); ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues()); return condition - ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime()) - : rt; + ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime()) + : rt; } // Forces the end to be exclusive if the condition holds @@ -691,8 +689,8 @@ public class DataResolverTest Slice slice = rt.deletedSlice(); ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues()); return condition - ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime()) - : rt; + ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime()) + : rt; } private static ByteBuffer bb(int b) @@ -708,9 +706,10 @@ public class DataResolverTest @Test public void testResolveComplexDelete() { + EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); + DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -719,8 +718,8 @@ public class DataResolverTest builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); builder.addCell(mapCell(0, 0, ts[0])); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); builder.newRow(Clustering.EMPTY); DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec); @@ -728,8 +727,8 @@ public class DataResolverTest Cell expectedCell = mapCell(1, 1, ts[1]); builder.addCell(expectedCell); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); try(PartitionIterator data = resolver.resolve()) { @@ -742,7 +741,6 @@ public class DataResolverTest } } - Mutation mutation = readRepair.getForEndpoint(peer1); Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator(); assertTrue(rowIter.hasNext()); @@ -760,9 +758,10 @@ public class DataResolverTest @Test public void testResolveDeletedCollection() { + EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); + DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -771,15 +770,15 @@ public class DataResolverTest builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); builder.addCell(mapCell(0, 0, ts[0])); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); builder.newRow(Clustering.EMPTY); DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec); builder.addComplexDeletion(m, expectedCmplxDelete); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); try(PartitionIterator data = resolver.resolve()) { @@ -803,9 +802,10 @@ public class DataResolverTest @Test public void testResolveNewCollection() { + EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); + DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -818,11 +818,11 @@ public class DataResolverTest builder.addCell(expectedCell); // empty map column - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk)))); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk)))); try(PartitionIterator data = resolver.resolve()) { @@ -852,9 +852,10 @@ public class DataResolverTest @Test public void testResolveNewCollectionOverwritingDeleted() { + EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); - DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); + DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -863,8 +864,8 @@ public class DataResolverTest builder.newRow(Clustering.EMPTY); builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); - InetAddressAndPort peer1 = peer(); - resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); // newer, overwritten map column builder.newRow(Clustering.EMPTY); @@ -873,8 +874,8 @@ public class DataResolverTest Cell expectedCell = mapCell(1, 1, ts[1]); builder.addCell(expectedCell); - InetAddressAndPort peer2 = peer(); - resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())))); try(PartitionIterator data = resolver.resolve()) { @@ -897,18 +898,6 @@ public class DataResolverTest Assert.assertNull(readRepair.sent.get(peer2)); } - private InetAddressAndPort peer() - { - try - { - return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ }); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - private void assertRepairContainsDeletions(Mutation mutation, DeletionTime deletionTime, RangeTombstone...rangeTombstones) @@ -961,91 +950,8 @@ public class DataResolverTest assertEquals(update.metadata().name, cfm.name); } - - public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator) - { - return readResponseMessage(from, partitionIterator, command); - - } - public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd) + private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel) { - return MessageIn.create(from, - ReadResponse.createRemoteDataResponse(partitionIterator, cmd), - Collections.EMPTY_MAP, - MessagingService.Verb.REQUEST_RESPONSE, - MessagingService.current_version); - } - - private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime) - { - return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime); - } - - private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime) - { - ClusteringBound startBound = rtBound(start, true, inclusiveStart); - ClusteringBound endBound = rtBound(end, false, inclusiveEnd); - return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime)); - } - - private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive) - { - ClusteringBound.Kind kind = isStart - ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND) - : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND); - - return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues()); - } - - private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd) - { - ClusteringBound.Kind kind = inclusiveOnEnd - ? Kind.INCL_END_EXCL_START_BOUNDARY - : Kind.EXCL_END_INCL_START_BOUNDARY; - return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues()); - } - - private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime) - { - return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime)); - } - - private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2) - { - return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd), - new DeletionTime(markedForDeleteAt1, localDeletionTime1), - new DeletionTime(markedForDeleteAt2, localDeletionTime2)); - } - - private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec) - { - return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator()); - } - - private UnfilteredPartitionIterator iter(PartitionUpdate update) - { - return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator()); - } - - private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds) - { - SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator); - Collections.addAll(s, unfiltereds); - final Iterator<Unfiltered> iterator = s.iterator(); - - UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm, - key, - DeletionTime.LIVE, - cfm.regularAndStaticColumns(), - Rows.EMPTY_STATIC_ROW, - false, - EncodingStats.NO_STATS) - { - protected Unfiltered computeNext() - { - return iterator.hasNext() ? iterator.next() : endOfData(); - } - }; - return new SingletonUnfilteredPartitionIterator(rowIter); + return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas); } -} +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org