Revert "Stop accessing the partitioner directly via StorageService"
This reverts commit 69f77cbddd4c74448f227e9aceef84d345118184. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a22ce89e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a22ce89e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a22ce89e Branch: refs/heads/trunk Commit: a22ce89e868644ea04f0f3dacec05fff1673a345 Parents: 69f77cb Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Jul 31 12:24:52 2015 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Jul 31 12:24:52 2015 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 64 +------- .../cassandra/config/DatabaseDescriptor.java | 7 +- .../org/apache/cassandra/config/Schema.java | 2 +- .../apache/cassandra/cql3/TokenRelation.java | 7 +- .../cassandra/cql3/functions/TokenFct.java | 10 +- .../restrictions/StatementRestrictions.java | 2 +- .../cql3/restrictions/TokenFilter.java | 14 +- .../cql3/restrictions/TokenRestriction.java | 23 ++- .../cql3/statements/BatchStatement.java | 5 +- .../cql3/statements/ModificationStatement.java | 7 +- .../cql3/statements/SelectStatement.java | 2 +- .../db/AbstractReadCommandBuilder.java | 11 +- .../apache/cassandra/db/BatchlogManager.java | 7 +- .../apache/cassandra/db/ColumnFamilyStore.java | 35 ++-- src/java/org/apache/cassandra/db/DataRange.java | 3 +- .../cassandra/db/HintedHandOffManager.java | 29 ++-- src/java/org/apache/cassandra/db/Memtable.java | 1 + src/java/org/apache/cassandra/db/Mutation.java | 26 +-- .../apache/cassandra/db/PartitionPosition.java | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 2 +- .../apache/cassandra/db/RowUpdateBuilder.java | 2 +- .../db/SinglePartitionNamesCommand.java | 12 -- .../db/SinglePartitionReadCommand.java | 18 +-- .../db/SinglePartitionSliceCommand.java | 17 -- .../org/apache/cassandra/db/SystemKeyspace.java | 18 ++- .../db/compaction/CompactionManager.java | 4 +- .../db/compaction/LeveledManifest.java | 2 +- .../cassandra/db/compaction/Scrubber.java | 4 +- .../cassandra/db/compaction/Upgrader.java | 1 + .../cassandra/db/compaction/Verifier.java | 2 +- .../writers/DefaultCompactionWriter.java | 1 + .../writers/MajorLeveledCompactionWriter.java | 2 + .../writers/MaxSSTableSizeWriter.java | 2 + .../SplittingSizeTieredCompactionWriter.java | 2 + .../AbstractSimplePerColumnSecondaryIndex.java | 4 +- .../cassandra/db/index/SecondaryIndex.java | 20 ++- .../db/index/composites/CompositesIndex.java | 2 +- .../CompositesIndexOnClusteringKey.java | 3 +- .../db/index/composites/CompositesSearcher.java | 2 +- .../cassandra/db/index/keys/KeysIndex.java | 3 +- .../cassandra/db/index/keys/KeysSearcher.java | 2 +- .../db/marshal/LocalByPartionerType.java | 97 ++++++++++++ .../db/marshal/PartitionerDefinedOrder.java | 91 ----------- .../db/partitions/AtomicBTreePartition.java | 4 +- .../db/partitions/PartitionUpdate.java | 108 +++---------- .../rows/UnfilteredRowIteratorSerializer.java | 3 +- .../cassandra/db/view/MaterializedView.java | 7 +- .../apache/cassandra/db/view/TemporalRow.java | 2 +- .../org/apache/cassandra/dht/BootStrapper.java | 12 +- .../cassandra/dht/ByteOrderedPartitioner.java | 5 - .../org/apache/cassandra/dht/IPartitioner.java | 6 - .../apache/cassandra/dht/LocalPartitioner.java | 5 - .../cassandra/dht/Murmur3Partitioner.java | 7 - .../dht/OrderPreservingPartitioner.java | 5 - .../apache/cassandra/dht/RandomPartitioner.java | 7 - .../org/apache/cassandra/dht/RangeStreamer.java | 2 +- .../dht/tokenallocator/TokenAllocation.java | 8 +- src/java/org/apache/cassandra/gms/Gossiper.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 10 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 15 +- .../cassandra/io/sstable/KeyIterator.java | 8 +- .../io/sstable/ReducingKeyIterator.java | 2 +- .../apache/cassandra/io/sstable/SSTable.java | 21 +-- .../cassandra/io/sstable/SSTableLoader.java | 20 ++- .../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +- .../io/sstable/SSTableSimpleWriter.java | 4 +- .../io/sstable/format/SSTableReader.java | 74 +++++---- .../io/sstable/format/SSTableWriter.java | 16 +- .../io/sstable/format/big/BigFormat.java | 8 +- .../io/sstable/format/big/BigTableReader.java | 8 +- .../io/sstable/format/big/BigTableScanner.java | 6 +- .../io/sstable/format/big/BigTableWriter.java | 15 +- .../apache/cassandra/locator/TokenMetadata.java | 32 +--- .../apache/cassandra/net/MessagingService.java | 6 +- .../repair/RepairMessageVerbHandler.java | 4 +- .../cassandra/schema/LegacySchemaMigrator.java | 16 +- .../apache/cassandra/schema/SchemaKeyspace.java | 33 ++-- .../apache/cassandra/service/CacheService.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 11 +- .../cassandra/service/StorageService.java | 57 +++---- .../service/pager/RangeNamesQueryPager.java | 4 +- .../service/pager/RangeSliceQueryPager.java | 3 +- .../apache/cassandra/service/paxos/Commit.java | 5 +- .../cassandra/streaming/StreamReader.java | 2 +- .../cassandra/thrift/CassandraServer.java | 48 +++--- .../cassandra/thrift/ThriftConversion.java | 4 +- .../cassandra/thrift/ThriftValidation.java | 3 +- .../utils/NativeSSTableLoaderClient.java | 17 +- .../io/sstable/CQLSSTableWriterLongTest.java | 1 + test/unit/org/apache/cassandra/MockSchema.java | 9 +- .../org/apache/cassandra/UpdateBuilder.java | 2 +- test/unit/org/apache/cassandra/Util.java | 49 ++---- .../apache/cassandra/config/CFMetaDataTest.java | 1 + .../cassandra/cql3/IndexQueryPagingTest.java | 3 + .../selection/SelectionColumnMappingTest.java | 2 +- .../entities/FrozenCollectionsTest.java | 5 +- .../cql3/validation/entities/JsonTest.java | 2 +- .../SecondaryIndexOnMapEntriesTest.java | 2 +- .../cql3/validation/entities/UserTypesTest.java | 5 +- .../validation/operations/SelectLimitTest.java | 2 +- .../SelectOrderedPartitionerTest.java | 2 +- .../cassandra/db/BatchlogManagerTest.java | 7 +- .../org/apache/cassandra/db/RowCacheTest.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 63 ++++++-- .../org/apache/cassandra/db/VerifyTest.java | 4 +- .../cassandra/db/compaction/TTLExpiryTest.java | 2 +- .../db/lifecycle/RealTransactionsTest.java | 2 + .../db/lifecycle/TransactionLogsTest.java | 4 +- .../apache/cassandra/dht/BootStrapperTest.java | 11 +- .../apache/cassandra/dht/KeyCollisionTest.java | 134 +++++++++++++++- .../apache/cassandra/dht/LengthPartitioner.java | 158 ------------------- .../cassandra/dht/PartitionerTestCase.java | 2 +- .../cassandra/gms/SerializationsTest.java | 12 +- .../io/sstable/BigTableWriterTest.java | 1 + .../io/sstable/CQLSSTableWriterTest.java | 130 +++++++-------- .../cassandra/io/sstable/IndexSummaryTest.java | 15 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 3 + .../cassandra/io/sstable/SSTableReaderTest.java | 32 ++-- .../io/sstable/SSTableRewriterTest.java | 2 +- .../apache/cassandra/repair/ValidatorTest.java | 9 +- .../service/ActiveRepairServiceTest.java | 6 +- .../service/LeaveAndBootstrapTest.java | 8 +- .../cassandra/service/SerializationsTest.java | 29 +--- .../cassandra/service/StorageProxyTest.java | 4 +- .../streaming/StreamingTransferTest.java | 12 +- .../apache/cassandra/utils/MerkleTreeTest.java | 2 +- .../cassandra/utils/SerializationsTest.java | 24 +-- 128 files changed, 878 insertions(+), 1103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index ffb7b5e..902b1d2 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -45,7 +45,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.LZ4Compressor; @@ -184,10 +183,7 @@ public final class CFMetaData private final boolean isCounter; private final boolean isMaterializedView; - private final boolean isIndex; - public volatile ClusteringComparator comparator; // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns - public final IPartitioner partitioner; // partitioner the table uses private final Serializers serializers; @@ -263,8 +259,7 @@ public final class CFMetaData boolean isMaterializedView, List<ColumnDefinition> partitionKeyColumns, List<ColumnDefinition> clusteringColumns, - PartitionColumns partitionColumns, - IPartitioner partitioner) + PartitionColumns partitionColumns) { this.cfId = cfId; this.ksName = keyspace; @@ -289,11 +284,6 @@ public final class CFMetaData flags.add(Flag.MATERIALIZEDVIEW); this.flags = Sets.immutableEnumSet(flags); - isIndex = cfName.contains("."); - - assert partitioner != null; - this.partitioner = partitioner; - // A compact table should always have a clustering assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns); @@ -339,8 +329,7 @@ public final class CFMetaData boolean isSuper, boolean isCounter, boolean isMaterializedView, - List<ColumnDefinition> columns, - IPartitioner partitioner) + List<ColumnDefinition> columns) { List<ColumnDefinition> partitions = new ArrayList<>(); List<ColumnDefinition> clusterings = new ArrayList<>(); @@ -375,8 +364,7 @@ public final class CFMetaData isMaterializedView, partitions, clusterings, - builder.build(), - partitioner); + builder.build()); } private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns) @@ -478,25 +466,7 @@ public final class CFMetaData isMaterializedView(), copy(partitionKeyColumns), copy(clusteringColumns), - copy(partitionColumns), - partitioner), - this); - } - - public CFMetaData copy(IPartitioner partitioner) - { - return copyOpts(new CFMetaData(ksName, - cfName, - cfId, - isSuper, - isCounter, - isDense, - isCompound, - isMaterializedView, - copy(partitionKeyColumns), - copy(clusteringColumns), - copy(partitionColumns), - partitioner), + copy(partitionColumns)), this); } @@ -567,19 +537,6 @@ public final class CFMetaData return cfName.contains("."); } - /** - * true if this CFS contains secondary index data. - */ - public boolean isIndex() - { - return isIndex; - } - - public DecoratedKey decorateKey(ByteBuffer key) - { - return partitioner.decorateKey(key); - } - public Map<ByteBuffer, ColumnDefinition> getColumnMetadata() { return columnMetadata; @@ -591,7 +548,7 @@ public final class CFMetaData */ public String getParentColumnFamilyName() { - return isIndex ? cfName.substring(0, cfName.indexOf('.')) : null; + return isSecondaryIndex() ? cfName.substring(0, cfName.indexOf('.')) : null; } public double getReadRepairChance() @@ -1435,7 +1392,6 @@ public final class CFMetaData private final boolean isSuper; private final boolean isCounter; private final boolean isMaterializedView; - private IPartitioner partitioner; private UUID tableId; @@ -1453,7 +1409,6 @@ public final class CFMetaData this.isSuper = isSuper; this.isCounter = isCounter; this.isMaterializedView = isMaterializedView; - this.partitioner = DatabaseDescriptor.getPartitioner(); } public static Builder create(String keyspace, String table) @@ -1486,12 +1441,6 @@ public final class CFMetaData return create(keyspace, table, false, false, true, isCounter); } - public Builder withPartitioner(IPartitioner partitioner) - { - this.partitioner = partitioner; - return this; - } - public Builder withId(UUID tableId) { this.tableId = tableId; @@ -1605,8 +1554,7 @@ public final class CFMetaData isMaterializedView, partitions, clusterings, - builder.build(), - partitioner); + builder.build()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3ec21d7..d32af4d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -18,6 +18,7 @@ package org.apache.cassandra.config; import java.io.File; +import java.io.IOException; import java.net.*; import java.util.*; @@ -742,12 +743,10 @@ public class DatabaseDescriptor return paritionerName; } - /* For tests ONLY, don't use otherwise or all hell will break loose. Tests should restore value at the end. */ - public static IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) + /* For tests ONLY, don't use otherwise or all hell will break loose */ + public static void setPartitioner(IPartitioner newPartitioner) { - IPartitioner old = partitioner; partitioner = newPartitioner; - return old; } public static IEndpointSnitch getEndpointSnitch() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index e1e7380..c934327 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -612,7 +612,7 @@ public class Schema MigrationManager.instance.notifyDropAggregate(uda); } - private synchronized KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation) + private KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation) { KeyspaceMetadata current = getKSMetaData(keyspaceName); if (current == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/TokenRelation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/TokenRelation.java b/src/java/org/apache/cassandra/cql3/TokenRelation.java index e0b71fa..14bd5e0 100644 --- a/src/java/org/apache/cassandra/cql3/TokenRelation.java +++ b/src/java/org/apache/cassandra/cql3/TokenRelation.java @@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.restrictions.Restriction; import org.apache.cassandra.cql3.restrictions.TokenRestriction; import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsNoDuplicates; import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsOnly; @@ -68,7 +69,7 @@ public final class TokenRelation extends Relation { List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm); Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames); - return new TokenRestriction.EQRestriction(cfm, columnDefs, term); + return new TokenRestriction.EQRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, term); } @Override @@ -85,7 +86,7 @@ public final class TokenRelation extends Relation { List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm); Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames); - return new TokenRestriction.SliceRestriction(cfm, columnDefs, bound, inclusive, term); + return new TokenRestriction.SliceRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, bound, inclusive, term); } @Override @@ -158,6 +159,6 @@ public final class TokenRelation extends Relation return Collections.singletonList(new ColumnSpecification(firstColumn.ksName, firstColumn.cfName, new ColumnIdentifier("partition key token", true), - cfm.partitioner.getTokenValidator())); + StorageService.getPartitioner().getTokenValidator())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/functions/TokenFct.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java index 283ac0b..c76b588 100644 --- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java +++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java @@ -22,17 +22,23 @@ import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.CBuilder; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.StorageService; public class TokenFct extends NativeScalarFunction { + // The actual token function depends on the partitioner used + private static final IPartitioner partitioner = StorageService.getPartitioner(); + private final CFMetaData cfm; public TokenFct(CFMetaData cfm) { - super("token", cfm.partitioner.getTokenValidator(), getKeyTypes(cfm)); + super("token", partitioner.getTokenValidator(), getKeyTypes(cfm)); this.cfm = cfm; } @@ -55,6 +61,6 @@ public class TokenFct extends NativeScalarFunction return null; builder.add(bb); } - return cfm.partitioner.getTokenFactory().toByteArray(cfm.partitioner.getToken(CFMetaData.serializePartitionKey(builder.build()))); + return partitioner.getTokenFactory().toByteArray(partitioner.getToken(CFMetaData.serializePartitionKey(builder.build()))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index ea87db7..d9fd5e4 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -378,7 +378,7 @@ public final class StatementRestrictions */ public AbstractBounds<PartitionPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException { - IPartitioner p = cfm.partitioner; + IPartitioner p = StorageService.getPartitioner(); if (partitionKeyRestrictions.isOnToken()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java index 3258b26..bf3f2f6 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.cql3.statements.Bound.END; import static org.apache.cassandra.cql3.statements.Bound.START; @@ -51,9 +52,9 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions private TokenRestriction tokenRestriction; /** - * Partitioner to manage tokens, extracted from tokenRestriction metadata. + * The partitioner */ - private final IPartitioner partitioner; + private static final IPartitioner partitioner = StorageService.getPartitioner(); @Override protected PrimaryKeyRestrictions getDelegate() @@ -73,7 +74,6 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions { this.restrictions = restrictions; this.tokenRestriction = tokenRestriction; - this.partitioner = tokenRestriction.metadata.partitioner; } @Override @@ -144,7 +144,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param values the restricted values * @return the values for which the tokens are not included within the specified range. */ - private List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values) + private static List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values) { List<ByteBuffer> remaining = new ArrayList<>(); @@ -166,7 +166,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param buffers the token restriction values * @return the range set corresponding to the specified list */ - private RangeSet<Token> toRangeSet(List<ByteBuffer> buffers) + private static RangeSet<Token> toRangeSet(List<ByteBuffer> buffers) { ImmutableRangeSet.Builder<Token> builder = ImmutableRangeSet.builder(); @@ -184,7 +184,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @return the range set corresponding to the specified slice * @throws InvalidRequestException if the request is invalid */ - private RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException + private static RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException { if (slice.hasBound(START)) { @@ -224,7 +224,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param buffer the buffer * @return the token corresponding to the specified buffer */ - private Token deserializeToken(ByteBuffer buffer) + private static Token deserializeToken(ByteBuffer buffer) { return partitioner.getTokenFactory().fromByteArray(buffer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java index 56da6da..0a7721a 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java @@ -22,7 +22,6 @@ import java.util.*; import com.google.common.base.Joiner; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.Term; @@ -45,18 +44,16 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions */ protected final List<ColumnDefinition> columnDefs; - final CFMetaData metadata; - /** * Creates a new <code>TokenRestriction</code> that apply to the specified columns. * + * @param comparator the clustering comparator * @param columnDefs the definition of the columns to which apply the token restriction */ - public TokenRestriction(CFMetaData metadata, List<ColumnDefinition> columnDefs) + public TokenRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs) { - super(metadata.getKeyValidatorAsClusteringComparator()); + super(comparator); this.columnDefs = columnDefs; - this.metadata = metadata; } @Override @@ -157,9 +154,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { private final Term value; - public EQRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Term value) + public EQRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Term value) { - super(cfm, columnDefs); + super(comparator, columnDefs); this.value = value; } @@ -193,9 +190,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { private final TermSlice slice; - public SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term) + public SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term) { - super(cfm, columnDefs); + super(comparator, columnDefs); slice = TermSlice.newInstance(bound, inclusive, term); } @@ -253,7 +250,7 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions throw invalidRequest("More than one restriction was found for the end bound on %s", getColumnNamesAsString()); - return new SliceRestriction(metadata, columnDefs, slice.merge(otherSlice.slice)); + return new SliceRestriction(comparator, columnDefs, slice.merge(otherSlice.slice)); } @Override @@ -261,9 +258,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { return String.format("SLICE%s", slice); } - private SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, TermSlice slice) + private SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, TermSlice slice) { - super(cfm, columnDefs); + super(comparator, columnDefs); this.slice = slice; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 5d1333c..08a47c0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -39,6 +39,7 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.Column; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; @@ -259,7 +260,7 @@ public class BatchStatement implements CQLStatement for (ByteBuffer key : keys) { - DecoratedKey dk = statement.cfm.decorateKey(key); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); IMutation mutation = ksMap.get(dk.getKey()); Mutation mut; if (mutation == null) @@ -425,7 +426,7 @@ public class BatchStatement implements CQLStatement throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)"); if (key == null) { - key = statement.cfm.decorateKey(pks.get(0)); + key = StorageService.getPartitioner().decorateKey(pks.get(0)); casRequest = new CQL3CasRequest(statement.cfm, key, true, conditionColumns, updatesRegularRows, updatesStaticRow); } else if (!key.getKey().equals(pks.get(0))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 9f2c952..2f3de4c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -541,7 +541,7 @@ public abstract class ModificationStatement implements CQLStatement ColumnFilter.selection(toRead), RowFilter.NONE, DataLimits.NONE, - key, + StorageService.getPartitioner().decorateKey(key), new ClusteringIndexNamesFilter(clusterings, false))); Map<DecoratedKey, Partition> map = new HashMap(); @@ -639,7 +639,7 @@ public abstract class ModificationStatement implements CQLStatement if (keys.size() > 1) throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); - DecoratedKey key = cfm.decorateKey(keys.get(0)); + DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0)); long now = options.getTimestamp(queryState); CBuilder cbuilder = createClustering(options); @@ -820,7 +820,8 @@ public abstract class ModificationStatement implements CQLStatement for (ByteBuffer key: keys) { ThriftValidation.validateKey(cfm, key); - PartitionUpdate upd = new PartitionUpdate(cfm, key, updatedColumns(), 1); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1); addUpdateForKey(upd, clustering, params); Mutation mut = new Mutation(upd); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 94f04b8..84d621b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -410,7 +410,7 @@ public class SelectStatement implements CQLStatement for (ByteBuffer key : keys) { QueryProcessor.validateKey(key); - DecoratedKey dk = cfm.decorateKey(ByteBufferUtil.clone(key)); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key)); commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java index 5e3b726..2ddc6ca 100644 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@ -24,9 +24,14 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.*; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public abstract class AbstractReadCommandBuilder @@ -307,13 +312,13 @@ public abstract class AbstractReadCommandBuilder PartitionPosition start = startKey; if (start == null) { - start = cfs.getPartitioner().getMinimumToken().maxKeyBound(); + start = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); startInclusive = false; } PartitionPosition end = endKey; if (end == null) { - end = cfs.getPartitioner().getMinimumToken().maxKeyBound(); + end = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); endInclusive = true; } @@ -336,7 +341,7 @@ public abstract class AbstractReadCommandBuilder return (DecoratedKey)partitionKey[0]; ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return metadata.decorateKey(key); + return StorageService.getPartitioner().decorateKey(key); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 9e90d9d..154a86b 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -199,11 +199,8 @@ public class BatchlogManager implements BatchlogManagerMBean private void deleteBatch(UUID id) { - Mutation mutation = new Mutation( - PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, - UUIDType.instance.decompose(id), - FBUtilities.timestampMicros(), - FBUtilities.nowInSeconds())); + Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(id))); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); mutation.apply(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index c4377d6..24da365 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -137,6 +137,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final Keyspace keyspace; public final String name; public final CFMetaData metadata; + public final IPartitioner partitioner; private final String mbeanName; @Deprecated private final String oldMBeanName; @@ -303,18 +304,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, + IPartitioner partitioner, int generation, CFMetaData metadata, Directories directories, boolean loadSSTables) { - this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true); + this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true); } @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, + IPartitioner partitioner, int generation, CFMetaData metadata, Directories directories, @@ -328,6 +331,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.metadata = metadata; this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold()); this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); + this.partitioner = partitioner; this.directories = directories; this.indexManager = new SecondaryIndexManager(this); this.materializedViewManager = new MaterializedViewManager(this); @@ -345,7 +349,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (data.loadsstables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata); + Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); data.addInitialSSTables(sstables); } @@ -482,11 +486,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, boolean loadSSTables) { - return createColumnFamilyStore(keyspace, columnFamily, Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables); + return createColumnFamilyStore(keyspace, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables); } public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, + IPartitioner partitioner, CFMetaData metadata, boolean loadSSTables) { @@ -505,7 +510,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; - return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables); + return new ColumnFamilyStore(keyspace, columnFamily, partitioner, value, metadata, directories, loadSSTables); } /** @@ -676,7 +681,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata); + reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata, partitioner); } catch (IOException e) { @@ -1438,7 +1443,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // WARNING: this returns the set of LIVE sstables only, which may be only partially written public List<String> getSSTablesForKey(String key) { - DecoratedKey dk = decorateKey(metadata.getKeyValidator().fromString(key)); + DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); try (OpOrder.Group op = readOrdering.start()) { List<String> files = new ArrayList<>(); @@ -1484,7 +1489,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean keyIter.hasNext(); ) { RowCacheKey key = keyIter.next(); - DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); + DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedPartition(dk); } @@ -1495,7 +1500,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean keyIter.hasNext(); ) { CounterCacheKey key = keyIter.next(); - DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); + DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } @@ -1613,7 +1618,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (logger.isDebugEnabled()) logger.debug("using snapshot sstable {}", entries.getKey()); // open without tracking hotness - sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, false); + sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false); // This is technically not necessary since it's a snapshot but makes things easier refs.tryRef(sstable); } @@ -2075,20 +2080,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return n; } - public IPartitioner getPartitioner() - { - return metadata.partitioner; - } - - public DecoratedKey decorateKey(ByteBuffer key) - { - return metadata.decorateKey(key); - } - /** true if this CFS contains secondary index data */ public boolean isIndex() { - return metadata.isIndex(); + return partitioner instanceof LocalPartitioner; } public Iterable<ColumnFamilyStore> concatWithIndexes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 023f572..358b0ac 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; /** * Groups both the range of partitions to query, and the clustering index filter to @@ -373,7 +374,7 @@ public class DataRange public DataRange deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { - AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); if (in.readBoolean()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 73189a6..6ff880c 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -33,9 +33,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.ColumnDefinition; @@ -46,6 +46,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; @@ -130,7 +131,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - ByteBuffer key = UUIDType.instance.decompose(targetId); + DecoratedKey key = StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)); + Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version); ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value); @@ -177,8 +179,9 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) { + DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes); Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds()); - PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeBackedRow.singleCellRow(clustering, cell)); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell)); new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } @@ -201,8 +204,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) return; UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint); - ByteBuffer key = ByteBuffer.wrap(UUIDGen.decompose(hostId)); - final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, key, System.currentTimeMillis(), FBUtilities.nowInSeconds())); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId))); + final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds())); // execute asynchronously to avoid blocking caller (which may be processing gossip) Runnable runnable = new Runnable() @@ -365,6 +368,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hostId = Gossiper.instance.getHostId(endpoint); logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint); final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); + DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes); final AtomicInteger rowsReplayed = new AtomicInteger(0); @@ -376,7 +380,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group op = hintStore.readOrdering.start(); - RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, hostIdBytes).queryMemtableAndDisk(hintStore, op), nowInSec)) + RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec)) { List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList(); @@ -476,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean ColumnFilter.all(hintStore.metadata), RowFilter.NONE, DataLimits.cqlLimits(Integer.MAX_VALUE, 1), - DataRange.allData(hintStore.metadata.partitioner)); + DataRange.allData(StorageService.getPartitioner())); try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { @@ -542,12 +546,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public List<String> listEndpointsPendingHints() { - // Extract the keys as strings to be reported. - List<String> result = new ArrayList<>(); + Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory(); + // Extract the keys as strings to be reported. + LinkedList<String> result = new LinkedList<>(); ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds()); - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); - UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { while (iter.hasNext()) { @@ -556,11 +560,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean // We don't delete by range on the hints table, so we don't have to worry about the // iterator returning only range tombstone marker if (partition.hasNext()) - result.add(UUIDType.instance.compose(partition.partitionKey().getKey()).toString()); + result.addFirst(tokenFactory.toString(partition.partitionKey().getToken())); } } } - return result; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 5ec9fe5..ecaf063 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -436,6 +436,7 @@ public class Memtable implements Comparable<Memtable> (long)partitions.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, + cfs.partitioner, sstableMetadataCollector, new SerializationHeader(cfs.metadata, columns, stats), txn)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index d6b0a43..ace114b 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import org.apache.commons.lang3.StringUtils; @@ -32,6 +31,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +107,6 @@ public class Mutation implements IMutation public Mutation add(PartitionUpdate update) { assert update != null; - assert update.partitionKey().getPartitioner() == key.getPartitioner(); PartitionUpdate prev = modifications.put(update.metadata().cfId, update); if (prev != null) // developer error @@ -271,14 +270,15 @@ public class Mutation implements IMutation public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { + String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that if (version < MessagingService.VERSION_20) - in.readUTF(); // read pre-2.0 keyspace name + keyspaceName = in.readUTF(); - ByteBuffer key = null; + DecoratedKey key = null; int size; if (version < MessagingService.VERSION_30) { - key = ByteBufferUtil.readWithShortLength(in); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); size = in.readInt(); } else @@ -288,19 +288,23 @@ public class Mutation implements IMutation assert size > 0; - PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag, key); if (size == 1) - return new Mutation(update); + return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key)); Map<UUID, PartitionUpdate> modifications = new HashMap<>(size); - DecoratedKey dk = update.partitionKey(); - for (int i = 1; i < size; ++i) + PartitionUpdate update = null; + for (int i = 0; i < size; ++i) { - update = PartitionUpdate.serializer.deserialize(in, version, flag, dk); + update = PartitionUpdate.serializer.deserialize(in, version, flag, key); modifications.put(update.metadata().cfId, update); } - return new Mutation(update.metadata().ksName, dk, modifications); + if (keyspaceName == null) + keyspaceName = update.metadata().ksName; + if (key == null) + key = update.partitionKey(); + + return new Mutation(keyspaceName, key, modifications); } public Mutation deserialize(DataInputPlus in, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/PartitionPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java index ac5258d..afb446d 100644 --- a/src/java/org/apache/cassandra/db/PartitionPosition.java +++ b/src/java/org/apache/cassandra/db/PartitionPosition.java @@ -84,7 +84,7 @@ public interface PartitionPosition extends RingPosition<PartitionPosition> if (kind == Kind.ROW_KEY) { ByteBuffer k = ByteBufferUtil.readWithShortLength(in); - return p.decorateKey(k); + return StorageService.getPartitioner().decorateKey(k); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 18b6950..d48fca5 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -90,7 +90,7 @@ public class PartitionRangeReadCommand extends ReadCommand ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, - DataRange.allData(metadata.partitioner)); + DataRange.allData(StorageService.getPartitioner())); } public DataRange dataRange() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index e4f05b0..c06a7f7 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -223,7 +223,7 @@ public class RowUpdateBuilder return (DecoratedKey)partitionKey[0]; ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return metadata.decorateKey(key); + return StorageService.getPartitioner().decorateKey(key); } private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation mutation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index b0958fc..5ffbd55 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Sets; @@ -68,17 +67,6 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } - public SinglePartitionNamesCommand(CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - ByteBuffer key, - ClusteringIndexNamesFilter clusteringIndexFilter) - { - this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter); - } - public SinglePartitionNamesCommand copy() { return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 6e9e2d5..3d4e42e 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.cache.*; @@ -58,7 +57,6 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter F clusteringIndexFilter) { super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); - assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; } @@ -147,20 +145,6 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL); } - /** - * Creates a new read command that queries a single partition in its entirety. - * - * @param metadata the table to query. - * @param nowInSec the time in seconds to use are "now" for this query. - * @param key the partition key for the partition to query. - * - * @return a newly created read command that queries all the rows of {@code key}. - */ - public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) - { - return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); - } - public DecoratedKey partitionKey() { return partitionKey; @@ -502,7 +486,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { - DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); + DecoratedKey key = StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in)); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); if (filter instanceof ClusteringIndexNamesFilter) return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java index bb9a35e..b4cbbd6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterables; @@ -98,22 +97,6 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } - /** - * Creates a new single partition slice command for the provided slices. - * - * @param metadata the table to query. - * @param nowInSec the time in seconds to use are "now" for this query. - * @param key the partition key for the partition to query. - * @param slices the slices of rows to query. - * - * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will - * query every columns for the table (without limit or row filtering) and be in forward order. - */ - public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices) - { - return create(metadata, nowInSec, metadata.decorateKey(key), slices); - } - public SinglePartitionSliceCommand copy() { return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index d17eaf7..e31feaa 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -451,6 +451,11 @@ public final class SystemKeyspace DECOMMISSIONED } + private static DecoratedKey decorate(ByteBuffer key) + { + return StorageService.getPartitioner().decorateKey(key); + } + public static void finishStartup() { persistLocalMetadata(); @@ -559,7 +564,7 @@ public final class SystemKeyspace public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token) { String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; - Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory(); + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); } @@ -578,7 +583,7 @@ public final class SystemKeyspace generation = row.getInt("generation_number"); if (row.has("last_key")) { - Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory(); + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); lastKey = factory.fromString(row.getString("last_key")); } @@ -712,9 +717,7 @@ public final class SystemKeyspace private static Set<String> tokensAsSet(Collection<Token> tokens) { - if (tokens.isEmpty()) - return Collections.emptySet(); - Token.TokenFactory factory = StorageService.instance.getTokenFactory(); + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); Set<String> s = new HashSet<>(tokens.size()); for (Token tk : tokens) s.add(factory.toString(tk)); @@ -723,7 +726,7 @@ public final class SystemKeyspace private static Collection<Token> deserializeTokens(Collection<String> tokensStrings) { - Token.TokenFactory factory = StorageService.instance.getTokenFactory(); + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); List<Token> tokens = new ArrayList<>(tokensStrings.size()); for (String tk : tokensStrings) tokens.add(factory.fromString(tk)); @@ -1162,7 +1165,8 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.partitionColumns(), estimates.size()); + DecoratedKey key = decorate(UTF8Type.instance.decompose(keyspace)); + PartitionUpdate update = new PartitionUpdate(SizeEstimates, key, SizeEstimates.partitionColumns(), estimates.size()); Mutation mutation = new Mutation(update); // delete all previous values with a single range tombstone. http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 548c661..3dd6f38 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -961,6 +961,7 @@ public class CompactionManager implements CompactionManagerMBean expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel(), + cfs.partitioner, sstable.header, txn); } @@ -992,6 +993,7 @@ public class CompactionManager implements CompactionManagerMBean (long) expectedBloomFilterSize, repairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), SerializationHeader.make(cfs.metadata, sstables), txn); @@ -1083,7 +1085,7 @@ public class CompactionManager implements CompactionManagerMBean } // determine tree depth from number of partitions, but cap at 20 to prevent large tree. int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); long start = System.nanoTime(); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 7fd5717..0cee370 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -80,7 +80,7 @@ public class LeveledManifest for (int i = 0; i < generations.length; i++) { generations[i] = new ArrayList<>(); - lastCompactedKeys[i] = cfs.getPartitioner().getMinimumToken().minKeyBound(); + lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound(); } compactionCounter = new int[MAX_LEVEL_COUNT]; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 5b3f6c7..81e307a 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -179,7 +179,7 @@ public class Scrubber implements Closeable DecoratedKey key = null; try { - key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); } catch (Throwable th) { @@ -249,7 +249,7 @@ public class Scrubber implements Closeable { outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s", dataSizeFromIndex, dataStartFromIndex)); - key = sstable.decorateKey(currentIndexKey); + key = sstable.partitioner.decorateKey(currentIndexKey); try { dataFile.seek(dataStartFromIndex); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index b8a102e..be0dd2a 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -83,6 +83,7 @@ public class Upgrader estimatedRows, repairedAt, cfs.metadata, + cfs.partitioner, sstableMetadataCollector, SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)), transaction); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index ae4e966..90a97a0 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -146,7 +146,7 @@ public class Verifier implements Closeable DecoratedKey key = null; try { - key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); } catch (Throwable th) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 53dad55..cdacddc 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -52,6 +52,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter estimatedTotalKeys, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index a44ea7e..ad58967 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -67,6 +67,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter keysPerSSTable, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -95,6 +96,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter averageEstimatedKeysPerSSTable, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 3942b1e..9902357 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -56,6 +56,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -74,6 +75,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 5d8670d..14cb795 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -89,6 +89,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentPartitionsToWrite, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -112,6 +113,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentPartitionsToWrite, minRepairedAt, cfs.metadata, + cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index 4bb0bc4..842cbb9 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -50,9 +51,10 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec columnDef = columnDefs.iterator().next(); - CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef, getIndexKeyComparator()); + CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef); indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, indexedCfMetadata.cfName, + new LocalPartitioner(getIndexKeyComparator()), indexedCfMetadata, baseCfs.getTracker().loadsstables); }