http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index a43e3eb..4af4a92 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.reads.repair; -import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -27,24 +26,28 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.service.reads.DigestResolver; /** * Bypasses the read repair path for short read protection and testing */ -public class NoopReadRepair implements ReadRepair +public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L> { public static final NoopReadRepair instance = new NoopReadRepair(); private NoopReadRepair() {} - public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + @Override + public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas) { return UnfilteredPartitionIterators.MergeListener.NOOP; } - public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + @Override + public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer) { resultConsumer.accept(digestResolver.getData()); } @@ -72,7 +75,7 @@ public class NoopReadRepair implements ReadRepair } @Override - public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) { }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java index 6cf761a..4cae3ae 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java @@ -28,18 +28,18 @@ import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener { - private final InetAddressAndPort[] sources; + private final ReplicaLayout replicaLayout; private final ReadCommand command; private final ConsistencyLevel consistency; private final ReadRepair readRepair; - public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) + public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) { - this.sources = sources; + this.replicaLayout = replicaLayout; this.command = command; this.consistency = consistency; this.readRepair = readRepair; @@ -47,10 +47,10 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) { - return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, consistency, readRepair); + return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair); } - private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) + protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) { Columns statics = Columns.NONE; Columns regulars = Columns.NONE; @@ -66,7 +66,7 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat return new RegularAndStaticColumns(statics, regulars); } - private boolean isReversed(List<UnfilteredRowIterator> versions) + protected boolean isReversed(List<UnfilteredRowIterator> versions) { for (UnfilteredRowIterator iter : versions) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java index d994b23..c13e2d6 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java @@ -21,27 +21,28 @@ package org.apache.cassandra.service.reads.repair; import java.util.Map; import com.codahale.metrics.Meter; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.metrics.ReadRepairMetrics; /** * Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations * to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads */ -public class ReadOnlyReadRepair extends AbstractReadRepair +public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L> { - public ReadOnlyReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) { - super(command, queryStartNanoTime, consistency); + super(command, replicaLayout, queryStartNanoTime); } @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout) { return UnfilteredPartitionIterators.MergeListener.NOOP; } @@ -59,7 +60,7 @@ public class ReadOnlyReadRepair extends AbstractReadRepair } @Override - public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) { throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index 97f0f67..168f003 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -17,44 +17,45 @@ */ package org.apache.cassandra.service.reads.repair; -import java.util.List; import java.util.Map; import java.util.function.Consumer; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.locator.Endpoints; + import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.DigestResolver; -public interface ReadRepair +public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> { public interface Factory { - ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency); + <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime); + } + + static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime) + { + return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime); } /** * Used by DataResolver to generate corrections as the partition iterator is consumed */ - UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints); + UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout); /** * Called when the digests from the initial read don't match. Reads may block on the * repair started by this method. * @param digestResolver supplied so we can get the original data response - * @param allEndpoints all available replicas for this read - * @param contactedEndpoints the replicas we actually sent requests to * @param resultConsumer hook for the repair to set it's result on completion */ - public void startRepair(DigestResolver digestResolver, - List<InetAddressAndPort> allEndpoints, - List<InetAddressAndPort> contactedEndpoints, - Consumer<PartitionIterator> resultConsumer); + public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer); /** * Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair} @@ -81,17 +82,13 @@ public interface ReadRepair public void maybeSendAdditionalWrites(); /** - * Hook for the merge listener to start repairs on individual partitions. - */ - void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations); - - /** * Block on any mutations (or timeout) we sent out to repair replicas in {@link ReadRepair#repairPartition} */ public void awaitWrites(); - static ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) - { - return command.metadata().params.readRepair.create(command, queryStartNanoTime, consistency); - } + /** + * Repairs a partition _after_ receiving data responses. This method receives replica list, since + * we will block repair only on the replicas that have responded. + */ + void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java index 1117822..6eff395 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.reads.repair; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -38,8 +39,8 @@ final class ReadRepairDiagnostics { } - static void startRepair(AbstractReadRepair readRepair, List<InetAddressAndPort> endpointDestinations, - DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints) + static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations, + DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints) { if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR)) service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java index 152f7e6..9e14362 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair; import java.io.Serializable; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,9 +49,9 @@ final class ReadRepairEvent extends DiagnosticEvent private final ConsistencyLevel consistency; private final SpeculativeRetryPolicy.Kind speculativeRetry; @VisibleForTesting - final List<InetAddressAndPort> destinations; + final Collection<InetAddressAndPort> destinations; @VisibleForTesting - final List<InetAddressAndPort> allEndpoints; + final Collection<InetAddressAndPort> allEndpoints; @Nullable private final DigestResolverDebugResult[] digestsByEndpoint; @@ -60,13 +61,13 @@ final class ReadRepairEvent extends DiagnosticEvent SPECULATED_READ } - ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, List<InetAddressAndPort> destinations, - List<InetAddressAndPort> allEndpoints, DigestResolver digestResolver) + ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, Collection<InetAddressAndPort> destinations, + Collection<InetAddressAndPort> allEndpoints, DigestResolver digestResolver) { this.keyspace = readRepair.cfs.keyspace; this.tableName = readRepair.cfs.getTableName(); this.cqlCommand = readRepair.command.toCQLString(); - this.consistency = readRepair.consistency; + this.consistency = readRepair.replicaLayout.consistencyLevel(); this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind(); this.destinations = destinations; this.allEndpoints = allEndpoints; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java index 5945633..28c0e9e 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java @@ -18,26 +18,25 @@ package org.apache.cassandra.service.reads.repair; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaLayout; public enum ReadRepairStrategy implements ReadRepair.Factory { NONE { - @Override - public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime) { - return new ReadOnlyReadRepair(command, queryStartNanoTime, consistency); + return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime); } }, BLOCKING { - @Override - public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime) { - return new BlockingReadRepair(command, queryStartNanoTime, consistency); + return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index cb6707d..b0c019a 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.repair; import java.util.Arrays; import java.util.Map; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.cassandra.db.Clustering; @@ -43,7 +44,9 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowDiffListener; import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.UnfilteredRowIterators; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.schema.ColumnMetadata; public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener @@ -51,14 +54,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis private final DecoratedKey partitionKey; private final RegularAndStaticColumns columns; private final boolean isReversed; - private final InetAddressAndPort[] sources; private final ReadCommand command; private final ConsistencyLevel consistency; private final PartitionUpdate.Builder[] repairs; - + private final Replica[] sources; private final Row.Builder[] currentRows; private final RowDiffListener diffListener; + private final ReplicaLayout layout; // The partition level deletion for the merge row. private DeletionTime partitionLevelDeletion; @@ -71,16 +74,21 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis private final ReadRepair readRepair; - public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) + public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) { this.partitionKey = partitionKey; this.columns = columns; this.isReversed = isReversed; - this.sources = sources; - repairs = new PartitionUpdate.Builder[sources.length]; - currentRows = new Row.Builder[sources.length]; - sourceDeletionTime = new DeletionTime[sources.length]; - markerToRepair = new ClusteringBound[sources.length]; + Endpoints<?> sources = layout.selected(); + this.sources = new Replica[sources.size()]; + for (int i = 0; i < sources.size(); i++) + this.sources[i] = sources.get(i); + + this.layout = layout; + repairs = new PartitionUpdate.Builder[sources.size()]; + currentRows = new Row.Builder[sources.size()]; + sourceDeletionTime = new DeletionTime[sources.size()]; + markerToRepair = new ClusteringBound[sources.size()]; this.command = command; this.consistency = consistency; this.readRepair = readRepair; @@ -89,25 +97,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis { public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) { - if (merged != null && !merged.equals(original)) + if (merged != null && !merged.equals(original) && !isTransient(i)) currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); } public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) { - if (merged != null && !merged.equals(original)) + if (merged != null && !merged.equals(original) && !isTransient(i)) currentRow(i, clustering).addRowDeletion(merged); } public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original) { - if (merged != null && !merged.equals(original)) + if (merged != null && !merged.equals(original) && !isTransient(i)) currentRow(i, clustering).addComplexDeletion(column, merged); } public void onCell(int i, Clustering clustering, Cell merged, Cell original) { - if (merged != null && !merged.equals(original) && isQueried(merged)) + if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i)) currentRow(i, clustering).addCell(merged); } @@ -126,6 +134,11 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis }; } + private boolean isTransient(int i) + { + return sources[i].isTransient(); + } + private PartitionUpdate.Builder update(int i) { if (repairs[i] == null) @@ -159,6 +172,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis this.partitionLevelDeletion = mergedDeletion; for (int i = 0; i < versions.length; i++) { + if (isTransient(i)) + continue; + if (mergedDeletion.supersedes(versions[i])) update(i).addPartitionDeletion(mergedDeletion); } @@ -193,6 +209,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis for (int i = 0; i < versions.length; i++) { + if (isTransient(i)) + continue; + RangeTombstoneMarker marker = versions[i]; // Update what the source now thinks is the current deletion @@ -245,12 +264,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1) { assert currentDeletion.equals(marker.openDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); + : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); } else // (2) { assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); + : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); } // and so unless it's a boundary whose opening deletion time is still equal to the current @@ -306,13 +325,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis public void close() { - Map<InetAddressAndPort, Mutation> mutations = null; + Map<Replica, Mutation> mutations = null; for (int i = 0; i < repairs.length; i++) { if (repairs[i] == null) continue; - Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i], false); + Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas"); + Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false); if (mutation == null) continue; @@ -324,7 +344,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis if (mutations != null) { - readRepair.repairPartition(partitionKey, mutations, sources); + readRepair.repairPartition(partitionKey, mutations, layout); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java index 609d2a0..38c25dc 100644 --- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +44,10 @@ public class DefaultConnectionFactory implements StreamConnectionFactory private static final int DEFAULT_CHANNEL_BUFFER_SIZE = 1 << 22; - private static final long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); - private static final int MAX_CONNECT_ATTEMPTS = 3; + @VisibleForTesting + public static long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); + @VisibleForTesting + public static int MAX_CONNECT_ATTEMPTS = 3; @Override public Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index b56f165..2f6deb5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -19,11 +19,14 @@ package org.apache.cassandra.streaming; import java.util.*; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.utils.UUIDGen; +import static com.google.common.collect.Iterables.all; import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; /** @@ -69,12 +72,13 @@ public class StreamPlan * * @param from endpoint address to fetch data from. * @param keyspace name of keyspace - * @param ranges ranges to fetch + * @param fullRanges ranges to fetch that from provides the full version of + * @param transientRanges ranges to fetch that from provides only transient data of * @return this object for chaining */ - public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges) { - return requestRanges(from, keyspace, ranges, EMPTY_COLUMN_FAMILIES); + return requestRanges(from, keyspace, fullRanges, transientRanges, EMPTY_COLUMN_FAMILIES); } /** @@ -82,14 +86,20 @@ public class StreamPlan * * @param from endpoint address to fetch data from. * @param keyspace name of keyspace - * @param ranges ranges to fetch + * @param fullRanges ranges to fetch that from provides the full data for + * @param transientRanges ranges to fetch that from provides only transient data for * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies) { + //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node + assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : + fullRanges.toString(); + assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : + transientRanges.toString(); StreamSession session = coordinator.getOrCreateNextSession(from); - session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); + session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies)); return this; } @@ -98,14 +108,14 @@ public class StreamPlan * * @param to endpoint address of receiver * @param keyspace name of keyspace - * @param ranges ranges to send + * @param replicas ranges to send * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, RangesAtEndpoint replicas, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(to); - session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); + session.addTransferRanges(keyspace, replicas, Arrays.asList(columnFamilies), flushBeforeTransfer); return this; } @@ -182,4 +192,10 @@ public class StreamPlan { return flushBeforeTransfer; } + + @VisibleForTesting + public StreamCoordinator getCoordinator() + { + return coordinator; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index 4a3761e..f37268f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -29,6 +29,10 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; public class StreamRequest @@ -36,12 +40,23 @@ public class StreamRequest public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer(); public final String keyspace; - public final Collection<Range<Token>> ranges; + //Full replicas and transient replicas are split based on the transient status of the remote we are fetching + //from. We preserve this distinction so on completion we can log to a system table whether we got the data transiently + //or fully from some remote. This is an important distinction for resumable bootstrap. The Replicas in these collections + //are local replicas (or dummy if this is triggered by repair) and don't encode the necessary information about + //what the remote provided. + public final RangesAtEndpoint full; + public final RangesAtEndpoint transientReplicas; public final Collection<String> columnFamilies = new HashSet<>(); - public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) + + public StreamRequest(String keyspace, RangesAtEndpoint full, RangesAtEndpoint transientReplicas, Collection<String> columnFamilies) { this.keyspace = keyspace; - this.ranges = ranges; + if (!full.endpoint().equals(transientReplicas.endpoint())) + throw new IllegalStateException("Mismatching endpoints: " + full + ", " + transientReplicas); + + this.full = full; + this.transientReplicas = transientReplicas; this.columnFamilies.addAll(columnFamilies); } @@ -50,49 +65,82 @@ public class StreamRequest public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException { out.writeUTF(request.keyspace); - out.writeInt(request.ranges.size()); - for (Range<Token> range : request.ranges) - { - MessagingService.validatePartitioner(range); - Token.serializer.serialize(range.left, out, version); - Token.serializer.serialize(range.right, out, version); - } out.writeInt(request.columnFamilies.size()); + + CompactEndpointSerializationHelper.streamingInstance.serialize(request.full.endpoint(), out, version); + serializeReplicas(request.full, out, version); + serializeReplicas(request.transientReplicas, out, version); for (String cf : request.columnFamilies) out.writeUTF(cf); } - public StreamRequest deserialize(DataInputPlus in, int version) throws IOException + private void serializeReplicas(RangesAtEndpoint replicas, DataOutputPlus out, int version) throws IOException { - String keyspace = in.readUTF(); - int rangeCount = in.readInt(); - List<Range<Token>> ranges = new ArrayList<>(rangeCount); - for (int i = 0; i < rangeCount; i++) + out.writeInt(replicas.size()); + + for (Replica replica : replicas) { - Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); - Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); - ranges.add(new Range<>(left, right)); + MessagingService.validatePartitioner(replica.range()); + Token.serializer.serialize(replica.range().left, out, version); + Token.serializer.serialize(replica.range().right, out, version); } + } + + public StreamRequest deserialize(DataInputPlus in, int version) throws IOException + { + String keyspace = in.readUTF(); int cfCount = in.readInt(); + InetAddressAndPort endpoint = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version); + + RangesAtEndpoint full = deserializeReplicas(in, version, endpoint, true); + RangesAtEndpoint transientReplicas = deserializeReplicas(in, version, endpoint, false); List<String> columnFamilies = new ArrayList<>(cfCount); for (int i = 0; i < cfCount; i++) columnFamilies.add(in.readUTF()); - return new StreamRequest(keyspace, ranges, columnFamilies); + return new StreamRequest(keyspace, full, transientReplicas, columnFamilies); } - public long serializedSize(StreamRequest request, int version) + RangesAtEndpoint deserializeReplicas(DataInputPlus in, int version, InetAddressAndPort endpoint, boolean isFull) throws IOException { - int size = TypeSizes.sizeof(request.keyspace); - size += TypeSizes.sizeof(request.ranges.size()); - for (Range<Token> range : request.ranges) + int replicaCount = in.readInt(); + + RangesAtEndpoint.Builder replicas = RangesAtEndpoint.builder(endpoint, replicaCount); + for (int i = 0; i < replicaCount; i++) { - size += Token.serializer.serializedSize(range.left, version); - size += Token.serializer.serializedSize(range.right, version); + //TODO, super need to review the usage of streaming vs not streaming endpoint serialization helper + //to make sure I'm not using the wrong one some of the time, like do repair messages use the + //streaming version? + Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); + Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); + replicas.add(new Replica(endpoint, new Range<>(left, right), isFull)); } + return replicas.build(); + } + + public long serializedSize(StreamRequest request, int version) + { + int size = TypeSizes.sizeof(request.keyspace); size += TypeSizes.sizeof(request.columnFamilies.size()); + size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(request.full.endpoint(), version); + size += replicasSerializedSize(request.transientReplicas, version); + size += replicasSerializedSize(request.full, version); for (String cf : request.columnFamilies) size += TypeSizes.sizeof(cf); return size; } + + private long replicasSerializedSize(RangesAtEndpoint replicas, int version) + { + long size = 0; + size += TypeSizes.sizeof(replicas.size()); + + for (Replica replica : replicas) + { + size += Token.serializer.serializedSize(replica.range().left, version); + size += Token.serializer.serializedSize(replica.range().right, version); + } + return size; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 393cd24..ec80772 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.OutboundConnectionIdentifier; @@ -49,6 +51,8 @@ import org.apache.cassandra.streaming.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import static com.google.common.collect.Iterables.all; + /** * Handles the streaming a one or more streams to and from a specific remote node. * @@ -243,7 +247,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber public StreamReceiver getAggregator(TableId tableId) { - assert receivers.containsKey(tableId); + assert receivers.containsKey(tableId) : "Missing tableId " + tableId; return receivers.get(tableId).getReceiver(); } @@ -297,38 +301,52 @@ public class StreamSession implements IEndpointStateChangeSubscriber * Request data fetch task to this session. * * @param keyspace Requesting keyspace - * @param ranges Ranges to retrieve data + * @param fullRanges Ranges to retrieve data that will return full data from the source + * @param transientRanges Ranges to retrieve data that will return transient data from the source * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace. */ - public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) + public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies) { - requests.add(new StreamRequest(keyspace, ranges, columnFamilies)); + //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node + assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString(); + assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString(); + requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies)); } /** * Set up transfer for specific keyspace/ranges/CFs * * @param keyspace Transfer keyspace - * @param ranges Transfer ranges + * @param replicas Transfer ranges * @param columnFamilies Transfer ColumnFamilies * @param flushTables flush tables? */ - synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables) + synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables) { failIfFinished(); Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies); if (flushTables) flushSSTables(stores); - List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<OutgoingStream> streams = getOutgoingStreamsForRanges(normalizedRanges, stores, pendingRepair, previewKind); + //Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have? + //Do we need to unwrap here also or is that just making it worse? + //Range and if it's transient + RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size()); + for (Replica replica : replicas) + { + for (Range<Token> unwrapped : replica.range().unwrap()) + { + unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull())); + } + } + List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind); addTransferStreams(streams); Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace); if (toBeUpdated == null) { toBeUpdated = new HashSet<>(); } - toBeUpdated.addAll(ranges); + toBeUpdated.addAll(replicas.ranges()); transferredRangesPerKeyspace.put(keyspace, toBeUpdated); } @@ -355,14 +373,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber } @VisibleForTesting - public List<OutgoingStream> getOutgoingStreamsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) + public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) { List<OutgoingStream> streams = new ArrayList<>(); try { for (ColumnFamilyStore cfs: stores) { - streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, ranges, pendingRepair, previewKind)); + streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind)); } } catch (Throwable t) @@ -561,7 +579,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { for (StreamRequest request : requests) - addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request + addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request for (StreamSummary summary : summaries) prepareReceiving(summary); @@ -812,4 +830,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber } maybeCompleted(); } + + @VisibleForTesting + public int getNumRequests() + { + return requests.size(); + } + + @VisibleForTesting + public int getNumTransfers() + { + return transferredRangesPerKeyspace.size(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/TableStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/TableStreamManager.java b/src/java/org/apache/cassandra/streaming/TableStreamManager.java index 11512e9..d97fabc 100644 --- a/src/java/org/apache/cassandra/streaming/TableStreamManager.java +++ b/src/java/org/apache/cassandra/streaming/TableStreamManager.java @@ -21,8 +21,7 @@ package org.apache.cassandra.streaming; import java.util.Collection; import java.util.UUID; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.streaming.messages.StreamMessageHeader; /** @@ -46,12 +45,12 @@ public interface TableStreamManager /** * Returns a collection of {@link OutgoingStream}s that contains the data selected by the - * given ranges, pendingRepair, and preview. + * given replicas, pendingRepair, and preview. * * There aren't any requirements on how data is divided between the outgoing streams */ Collection<OutgoingStream> createOutgoingStreams(StreamSession session, - Collection<Range<Token>> ranges, + RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 03b8af0..54187d1 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -816,6 +816,11 @@ public class NodeProbe implements AutoCloseable return ssProxy.getNaturalEndpoints(keyspace, cf, key); } + public List<String> getReplicas(String keyspace, String cf, String key) + { + return ssProxy.getReplicas(keyspace, cf, key); + } + public List<String> getSSTables(String keyspace, String cf, String key, boolean hexFormat) { ColumnFamilyStoreMBean cfsProxy = getCfsProxy(keyspace, cf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index c2193d4..1d09b0f 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -190,6 +190,8 @@ public class NodeTool ReloadSslCertificates.class, EnableAuditLog.class, DisableAuditLog.class, + GetReplicas.class, + DisableAuditLog.class, EnableOldProtocolVersions.class, DisableOldProtocolVersions.class ); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java index 8056ff8..31d80fa 100644 --- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java @@ -88,11 +88,11 @@ public class SSTableRepairedAtSetter if (setIsRepaired) { FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath()); - descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null); + descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, f.toMillis(), null, false); } else { - descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null); + descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 0, null, false); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java new file mode 100644 index 0000000..4c401fc --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +import static com.google.common.base.Preconditions.checkArgument; + +@Command(name = "getreplicas", description = "Print replicas for a given key") +public class GetReplicas extends NodeTool.NodeToolCmd +{ + @Arguments(usage = "<keyspace> <table> <key>", description = "The keyspace, the table, and the partition key for which we need to find replicas") + private List<String> args = new ArrayList<>(); + + @Override + public void execute(NodeProbe probe) + { + checkArgument(args.size() == 3, "getreplicas requires keyspace, table and partition key arguments"); + String ks = args.get(0); + String table = args.get(1); + String key = args.get(2); + + System.out.println(probe.getReplicas(ks, table, key)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index a53846c..1e0813c 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -161,7 +161,7 @@ public abstract class TraceState implements ProgressEventNotifier trace(MessageFormatter.format(format, arg1, arg2).getMessage()); } - public void trace(String format, Object[] args) + public void trace(String format, Object... args) { trace(MessageFormatter.arrayFormat(format, args).getMessage()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 84af41c..8e0b19f 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -68,7 +68,7 @@ public class ErrorMessage extends Message.Response ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); int required = body.readInt(); int alive = body.readInt(); - te = new UnavailableException(cl, required, alive); + te = UnavailableException.create(cl, required, alive); } break; case OVERLOADED: http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/Pair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Pair.java b/src/java/org/apache/cassandra/utils/Pair.java index ea8b8fc..cb09529 100644 --- a/src/java/org/apache/cassandra/utils/Pair.java +++ b/src/java/org/apache/cassandra/utils/Pair.java @@ -53,6 +53,18 @@ public class Pair<T1, T2> return "(" + left + "," + right + ")"; } + //For functional interfaces + public T1 left() + { + return left; + } + + //For functional interfaces + public T2 right() + { + return right; + } + public static <X, Y> Pair<X, Y> create(X x, Y y) { return new Pair<X, Y>(x, y); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java index e80faca..0c097a6 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@ -18,6 +18,8 @@ */ package org.apache.cassandra.utils.concurrent; +import java.util.AbstractCollection; +import java.util.Collection; import java.util.Iterator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -27,7 +29,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; * * @param <E> */ -public class Accumulator<E> implements Iterable<E> +public class Accumulator<E> { private volatile int nextIndex; private volatile int presentCount; @@ -105,7 +107,7 @@ public class Accumulator<E> implements Iterable<E> return values.length; } - public Iterator<E> iterator() + private Iterator<E> iterator(int count) { return new Iterator<E>() { @@ -113,7 +115,7 @@ public class Accumulator<E> implements Iterable<E> public boolean hasNext() { - return p < presentCount; + return p < count; } public E next() @@ -135,4 +137,23 @@ public class Accumulator<E> implements Iterable<E> throw new IndexOutOfBoundsException(); return (E) values[i]; } + + public Collection<E> snapshot() + { + int count = presentCount; + return new AbstractCollection<E>() + { + @Override + public Iterator<E> iterator() + { + return Accumulator.this.iterator(count); + } + + @Override + public int size() + { + return count; + } + }; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db index ceaa5a3..8fad34f 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db index 6968720..ae35335 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 index f1c192b..8a92f3c 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 @@ -1 +1 @@ -4004129384 \ No newline at end of file +2977407251 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db index af16195..d50fdeb 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db index 970e385..7341864 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt index bb800f8..b03b283 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt @@ -1,8 +1,8 @@ -Digest.crc32 Filter.db -CompressionInfo.db +Digest.crc32 Index.db -Summary.db -Data.db TOC.txt +Summary.db Statistics.db +CompressionInfo.db +Data.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db index f5ad4d0..f0a1cfb 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db index 7217716..b487fe8 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 index 4f1391a..ca286e0 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 @@ -1 +1 @@ -4072239034 \ No newline at end of file +2759187708 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db index 6dd3da6..c981a22 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db index 3a0e63f..33fccc9 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt index bb800f8..b03b283 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt @@ -1,8 +1,8 @@ -Digest.crc32 Filter.db -CompressionInfo.db +Digest.crc32 Index.db -Summary.db -Data.db TOC.txt +Summary.db Statistics.db +CompressionInfo.db +Data.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db index c665dfb..11219d0 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 index c6c24a7..985d6dc 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 @@ -1 +1 @@ -3772296151 \ No newline at end of file +462858821 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db index 6741430..3c68ac5 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt index bb800f8..b03b283 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt @@ -1,8 +1,8 @@ -Digest.crc32 Filter.db -CompressionInfo.db +Digest.crc32 Index.db -Summary.db -Data.db TOC.txt +Summary.db Statistics.db +CompressionInfo.db +Data.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db index d9fe576..620cdf2 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 index de7baed..bc5f671 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 @@ -1 +1 @@ -4035692752 \ No newline at end of file +3987542254 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db index e9556d1..689bec8 100644 Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt index bb800f8..b03b283 100644 --- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt +++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt @@ -1,8 +1,8 @@ -Digest.crc32 Filter.db -CompressionInfo.db +Digest.crc32 Index.db -Summary.db -Data.db TOC.txt +Summary.db Statistics.db +CompressionInfo.db +Data.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java index a5025a3..94a3bd3 100644 --- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java +++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java @@ -54,19 +54,21 @@ public class DynamicEndpointSnitchLongTest DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - List<InetAddressAndPort> hosts = new ArrayList<>(); + EndpointsForRange.Builder replicasBuilder = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE); // We want a big list of hosts so sorting takes time, making it much more likely to reproduce the // problem we're looking for. for (int i = 0; i < 100; i++) for (int j = 0; j < 256; j++) - hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j})); + replicasBuilder.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j}))); - ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts); + EndpointsForRange replicas = replicasBuilder.build(); + + ScoreUpdater updater = new ScoreUpdater(dsnitch, replicas); updater.start(); - List<InetAddressAndPort> result = null; + EndpointsForRange result = replicas; for (int i = 0; i < ITERATIONS; i++) - result = dsnitch.getSortedListByProximity(self, hosts); + result = dsnitch.sortedByProximity(self, result); updater.stopped = true; updater.join(); @@ -84,10 +86,10 @@ public class DynamicEndpointSnitchLongTest public volatile boolean stopped; private final DynamicEndpointSnitch dsnitch; - private final List<InetAddressAndPort> hosts; + private final EndpointsForRange hosts; private final Random random = new Random(); - public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts) + public ScoreUpdater(DynamicEndpointSnitch dsnitch, EndpointsForRange hosts) { this.dsnitch = dsnitch; this.hosts = hosts; @@ -97,9 +99,9 @@ public class DynamicEndpointSnitchLongTest { while (!stopped) { - InetAddressAndPort host = hosts.get(random.nextInt(hosts.size())); + Replica host = hosts.get(random.nextInt(hosts.size())); int score = random.nextInt(SCORE_RANGE); - dsnitch.receiveTiming(host, score); + dsnitch.receiveTiming(host.endpoint(), score); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index 01e67f0..e37045a 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -33,11 +33,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadataRef; @@ -121,8 +120,8 @@ public class LongStreamingTest private String ks; public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); + for (Replica range : StorageService.instance.getLocalReplicas(KS)) + addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort()); this.ks = keyspace; } @@ -148,8 +147,8 @@ public class LongStreamingTest private String ks; public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); + for (Replica range : StorageService.instance.getLocalReplicas(KS)) + addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort()); this.ks = keyspace; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java index 68cfd7e..73a2b71 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java @@ -21,18 +21,22 @@ package org.apache.cassandra.test.microbench; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.PendingRangeMaps; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaUtils; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; import java.net.UnknownHostException; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -50,7 +54,7 @@ public class PendingRangesBench PendingRangeMaps pendingRangeMaps; int maxToken = 256 * 100; - Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges; + Multimap<Range<Token>, Replica> oldPendingRanges; private Range<Token> genRange(String left, String right) { @@ -63,15 +67,17 @@ public class PendingRangesBench pendingRangeMaps = new PendingRangeMaps(); oldPendingRanges = HashMultimap.create(); - InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")}; + List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.2")); for (int i = 0; i < maxToken; i++) { for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) { Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15)); - pendingRangeMaps.addPendingRange(range, addresses[j]); - oldPendingRanges.put(range, addresses[j]); + Replica replica = Replica.fullReplica(endpoints.get(j), range); + pendingRangeMaps.addPendingRange(range, replica); + oldPendingRanges.put(range, replica); } } @@ -79,8 +85,9 @@ public class PendingRangesBench for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) { Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5)); - pendingRangeMaps.addPendingRange(range, addresses[j]); - oldPendingRanges.put(range, addresses[j]); + Replica replica = Replica.fullReplica(endpoints.get(j), range); + pendingRangeMaps.addPendingRange(range, replica); + oldPendingRanges.put(range, replica); } } @@ -97,13 +104,13 @@ public class PendingRangesBench { int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); - Set<InetAddressAndPort> endpoints = new HashSet<>(); - for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet()) + Set<Replica> replicas = new HashSet<>(); + for (Map.Entry<Range<Token>, Collection<Replica>> entry : oldPendingRanges.asMap().entrySet()) { if (entry.getKey().contains(searchToken)) - endpoints.addAll(entry.getValue()); + replicas.addAll(entry.getValue()); } - bh.consume(endpoints); + bh.consume(replicas); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 1201efa..bc2c19c 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -32,9 +32,11 @@ import java.util.function.Supplier; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +75,7 @@ import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class Util @@ -446,6 +449,14 @@ public class Util } } + public static void consume(UnfilteredPartitionIterator iterator) + { + while (iterator.hasNext()) + { + consume(iterator.next()); + } + } + public static int size(PartitionIterator iter) { int size = 0; @@ -478,6 +489,15 @@ public class Util && Iterators.elementsEqual(a, b); } + public static boolean sameContent(RowIterator a, RowIterator b) + { + return Objects.equals(a.metadata(), b.metadata()) + && Objects.equals(a.isReverseOrder(), b.isReverseOrder()) + && Objects.equals(a.partitionKey(), b.partitionKey()) + && Objects.equals(a.staticRow(), b.staticRow()) + && Iterators.elementsEqual(a, b); + } + public static boolean sameContent(Mutation a, Mutation b) { if (!a.key().equals(b.key()) || !a.getTableIds().equals(b.getTableIds())) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 07ab3dc..782e3b1 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -104,6 +104,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.io.util.DataOutputPlus", "org.apache.cassandra.io.util.DiskOptimizationStrategy", "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy", + "org.apache.cassandra.locator.Replica", "org.apache.cassandra.locator.SimpleSeedProvider", "org.apache.cassandra.locator.SeedProvider", "org.apache.cassandra.net.BackPressureStrategy", @@ -134,7 +135,9 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer", "org.apache.cassandra.ConsoleAppenderBeanInfo", "org.apache.cassandra.ConsoleAppenderCustomizer", - "org.apache.cassandra.locator.InetAddressAndPort" + "org.apache.cassandra.locator.InetAddressAndPort", + "org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement", + "org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement" }; static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 4a1a365..2fbbc28 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -49,6 +49,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.functions.FunctionName; @@ -148,7 +149,7 @@ public abstract class CQLTester { @Override public String getRack(InetAddressAndPort endpoint) { return RACK1; } @Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } - @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } + @Override public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } }); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java index 184c5ad..37605d6 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.AbstractEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.schema.*; @@ -527,7 +528,7 @@ public class CreateTest extends CQLTester public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; } @Override - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } }); // this forces the dc above to be added to the list of known datacenters (fixes static init problem --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org