Transient Replication and Cheap Quorums Patch by Blake Eggleston, Benedict Elliott Smith, Marcus Eriksson, Alex Petrov, Ariel Weisberg; Reviewed by Blake Eggleston, Marcus Eriksson, Benedict Elliott Smith, Alex Petrov, Ariel Weisberg for CASSANDRA-14404
Co-authored-by: Blake Eggleston <bdeggles...@gmail.com> Co-authored-by: Benedict Elliott Smith <bened...@apache.org> Co-authored-by: Marcus Eriksson <marc...@apache.org> Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7431b43 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7431b43 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7431b43 Branch: refs/heads/trunk Commit: f7431b432875e334170ccdb19934d05545d2cebd Parents: 5b645de Author: Ariel Weisberg <ar...@weisberg.ws> Authored: Thu Jul 5 18:10:40 2018 -0400 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Fri Aug 31 21:34:22 2018 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 4 + conf/cassandra.yaml | 4 + doc/source/architecture/dynamo.rst | 29 + doc/source/cql/ddl.rst | 14 +- ...iver-internal-only-3.12.0.post0-5838e2fd.zip | Bin 0 -> 269418 bytes pylib/cqlshlib/cql3handling.py | 1 + pylib/cqlshlib/cqlshhandling.py | 1 + pylib/cqlshlib/test/test_cqlsh_completion.py | 6 +- pylib/cqlshlib/test/test_cqlsh_output.py | 3 +- .../cassandra/batchlog/BatchlogManager.java | 45 +- .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 35 +- .../apache/cassandra/cql3/QueryProcessor.java | 13 +- .../cql3/statements/BatchStatement.java | 4 +- .../cql3/statements/BatchUpdatesCollector.java | 2 +- .../cql3/statements/ModificationStatement.java | 4 +- .../statements/SingleTableUpdatesCollector.java | 2 +- .../cql3/statements/UpdatesCollector.java | 5 +- .../schema/AlterKeyspaceStatement.java | 86 +- .../statements/schema/AlterTableStatement.java | 7 + .../statements/schema/CreateIndexStatement.java | 5 + .../statements/schema/CreateTableStatement.java | 9 + .../statements/schema/CreateViewStatement.java | 5 + .../cql3/statements/schema/TableAttributes.java | 3 + .../apache/cassandra/db/ColumnFamilyStore.java | 24 +- .../apache/cassandra/db/ConsistencyLevel.java | 211 +++-- .../cassandra/db/DiskBoundaryManager.java | 39 +- src/java/org/apache/cassandra/db/Memtable.java | 1 + .../cassandra/db/MutationVerbHandler.java | 5 +- .../cassandra/db/PartitionRangeReadCommand.java | 28 +- .../org/apache/cassandra/db/ReadCommand.java | 33 +- .../apache/cassandra/db/SSTableImporter.java | 6 +- .../db/SinglePartitionReadCommand.java | 26 +- .../org/apache/cassandra/db/SystemKeyspace.java | 98 ++- .../cassandra/db/SystemKeyspaceMigrator40.java | 45 + .../org/apache/cassandra/db/TableCQLHelper.java | 1 + .../compaction/AbstractCompactionStrategy.java | 3 +- .../db/compaction/AbstractStrategyHolder.java | 7 +- .../db/compaction/CompactionManager.java | 295 ++++--- .../db/compaction/CompactionStrategyHolder.java | 34 +- .../compaction/CompactionStrategyManager.java | 108 +-- .../cassandra/db/compaction/CompactionTask.java | 26 +- .../db/compaction/PendingRepairHolder.java | 42 +- .../db/compaction/PendingRepairManager.java | 45 +- .../cassandra/db/compaction/Scrubber.java | 4 +- .../cassandra/db/compaction/Upgrader.java | 10 +- .../cassandra/db/compaction/Verifier.java | 3 +- .../writers/CompactionAwareWriter.java | 2 + .../writers/DefaultCompactionWriter.java | 1 + .../writers/MajorLeveledCompactionWriter.java | 1 + .../writers/MaxSSTableSizeWriter.java | 1 + .../SplittingSizeTieredCompactionWriter.java | 1 + .../db/partitions/PartitionIterators.java | 12 - .../repair/CassandraKeyspaceRepairManager.java | 10 +- .../db/repair/PendingAntiCompaction.java | 22 +- .../db/streaming/CassandraOutgoingFile.java | 11 +- .../db/streaming/CassandraStreamManager.java | 36 +- .../db/streaming/CassandraStreamReader.java | 2 +- .../apache/cassandra/db/view/TableViews.java | 5 + .../apache/cassandra/db/view/ViewBuilder.java | 19 +- .../apache/cassandra/db/view/ViewManager.java | 2 +- .../org/apache/cassandra/db/view/ViewUtils.java | 64 +- src/java/org/apache/cassandra/dht/Range.java | 27 +- .../cassandra/dht/RangeFetchMapCalculator.java | 58 +- .../org/apache/cassandra/dht/RangeStreamer.java | 571 ++++++++---- src/java/org/apache/cassandra/dht/Splitter.java | 95 +- .../apache/cassandra/dht/StreamStateStore.java | 25 +- .../ReplicationAwareTokenAllocator.java | 2 +- .../dht/tokenallocator/TokenAllocation.java | 6 +- .../exceptions/UnavailableException.java | 20 +- .../org/apache/cassandra/gms/EndpointState.java | 5 + .../apache/cassandra/hints/HintsService.java | 21 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 3 +- .../apache/cassandra/io/sstable/SSTable.java | 13 + .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 14 +- .../io/sstable/SimpleSSTableMultiWriter.java | 3 +- .../sstable/format/RangeAwareSSTableWriter.java | 8 +- .../io/sstable/format/SSTableReader.java | 5 + .../io/sstable/format/SSTableWriter.java | 17 +- .../cassandra/io/sstable/format/Version.java | 2 + .../io/sstable/format/big/BigFormat.java | 17 +- .../io/sstable/format/big/BigTableWriter.java | 3 +- .../sstable/metadata/IMetadataSerializer.java | 4 +- .../io/sstable/metadata/MetadataCollector.java | 8 +- .../io/sstable/metadata/MetadataSerializer.java | 4 +- .../io/sstable/metadata/StatsMetadata.java | 52 +- .../locator/AbstractEndpointSnitch.java | 38 +- .../locator/AbstractNetworkTopologySnitch.java | 5 +- .../locator/AbstractReplicaCollection.java | 264 ++++++ .../locator/AbstractReplicationStrategy.java | 142 +-- .../locator/DynamicEndpointSnitch.java | 67 +- .../org/apache/cassandra/locator/Ec2Snitch.java | 2 +- .../org/apache/cassandra/locator/Endpoints.java | 157 ++++ .../cassandra/locator/EndpointsByRange.java | 63 ++ .../cassandra/locator/EndpointsByReplica.java | 61 ++ .../cassandra/locator/EndpointsForRange.java | 188 ++++ .../cassandra/locator/EndpointsForToken.java | 172 ++++ .../cassandra/locator/IEndpointSnitch.java | 18 +- .../cassandra/locator/InetAddressAndPort.java | 5 +- .../apache/cassandra/locator/LocalStrategy.java | 29 +- .../locator/NetworkTopologyStrategy.java | 87 +- .../locator/OldNetworkTopologyStrategy.java | 40 +- .../cassandra/locator/PendingRangeMaps.java | 161 ++-- .../cassandra/locator/RangesAtEndpoint.java | 313 +++++++ .../cassandra/locator/RangesByEndpoint.java | 54 ++ .../org/apache/cassandra/locator/Replica.java | 196 +++++ .../cassandra/locator/ReplicaCollection.java | 160 ++++ .../apache/cassandra/locator/ReplicaLayout.java | 381 ++++++++ .../cassandra/locator/ReplicaMultimap.java | 127 +++ .../org/apache/cassandra/locator/Replicas.java | 83 ++ .../cassandra/locator/ReplicationFactor.java | 130 +++ .../apache/cassandra/locator/SimpleSnitch.java | 8 +- .../cassandra/locator/SimpleStrategy.java | 37 +- .../cassandra/locator/SystemReplicas.java | 62 ++ .../apache/cassandra/locator/TokenMetadata.java | 102 ++- .../cassandra/metrics/KeyspaceMetrics.java | 43 +- .../cassandra/metrics/ReadRepairMetrics.java | 1 + .../apache/cassandra/metrics/TableMetrics.java | 17 +- .../apache/cassandra/net/IAsyncCallback.java | 11 +- .../apache/cassandra/net/MessagingService.java | 38 +- .../apache/cassandra/net/WriteCallbackInfo.java | 15 +- .../cassandra/repair/AbstractSyncTask.java | 31 + .../repair/AsymmetricLocalSyncTask.java | 7 +- .../repair/AsymmetricRemoteSyncTask.java | 6 + .../cassandra/repair/AsymmetricSyncTask.java | 10 +- .../apache/cassandra/repair/CommonRange.java | 82 ++ .../cassandra/repair/KeyspaceRepairManager.java | 8 +- .../apache/cassandra/repair/LocalSyncTask.java | 135 --- .../apache/cassandra/repair/RemoteSyncTask.java | 74 -- .../org/apache/cassandra/repair/RepairJob.java | 42 +- .../apache/cassandra/repair/RepairRunnable.java | 87 +- .../apache/cassandra/repair/RepairSession.java | 57 +- .../cassandra/repair/StreamingRepairTask.java | 8 +- .../repair/SymmetricLocalSyncTask.java | 142 +++ .../repair/SymmetricRemoteSyncTask.java | 84 ++ .../cassandra/repair/SymmetricSyncTask.java | 94 ++ .../org/apache/cassandra/repair/SyncTask.java | 97 --- .../repair/SystemDistributedKeyspace.java | 6 +- .../repair/consistent/LocalSessions.java | 36 +- .../apache/cassandra/schema/KeyspaceParams.java | 5 + .../cassandra/schema/ReplicationParams.java | 9 +- .../apache/cassandra/schema/SchemaKeyspace.java | 4 + .../apache/cassandra/schema/TableMetadata.java | 6 + .../apache/cassandra/schema/TableParams.java | 11 + .../service/AbstractWriteResponseHandler.java | 95 +- .../cassandra/service/ActiveRepairService.java | 52 +- .../service/BatchlogResponseHandler.java | 2 +- .../DatacenterSyncWriteResponseHandler.java | 21 +- .../service/DatacenterWriteResponseHandler.java | 26 +- .../service/PendingRangeCalculatorService.java | 2 +- .../apache/cassandra/service/StorageProxy.java | 672 ++++++--------- .../cassandra/service/StorageService.java | 860 ++++++++++++------- .../cassandra/service/StorageServiceMBean.java | 2 + .../cassandra/service/WriteResponseHandler.java | 25 +- .../service/reads/AbstractReadExecutor.java | 243 +++--- .../cassandra/service/reads/DataResolver.java | 83 +- .../cassandra/service/reads/DigestResolver.java | 79 +- .../cassandra/service/reads/ReadCallback.java | 57 +- .../service/reads/ResponseResolver.java | 32 +- .../reads/ShortReadPartitionsProtection.java | 36 +- .../service/reads/ShortReadProtection.java | 3 +- .../service/reads/ShortReadRowsProtection.java | 6 +- .../reads/repair/AbstractReadRepair.java | 90 +- .../reads/repair/BlockingPartitionRepair.java | 73 +- .../reads/repair/BlockingReadRepair.java | 29 +- .../reads/repair/BlockingReadRepairs.java | 19 - .../service/reads/repair/NoopReadRepair.java | 15 +- .../repair/PartitionIteratorMergeListener.java | 14 +- .../reads/repair/ReadOnlyReadRepair.java | 15 +- .../service/reads/repair/ReadRepair.java | 39 +- .../reads/repair/ReadRepairDiagnostics.java | 5 +- .../service/reads/repair/ReadRepairEvent.java | 11 +- .../reads/repair/ReadRepairStrategy.java | 13 +- .../reads/repair/RowIteratorMergeListener.java | 58 +- .../streaming/DefaultConnectionFactory.java | 7 +- .../apache/cassandra/streaming/StreamPlan.java | 38 +- .../cassandra/streaming/StreamRequest.java | 98 ++- .../cassandra/streaming/StreamSession.java | 54 +- .../cassandra/streaming/TableStreamManager.java | 7 +- .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../org/apache/cassandra/tools/NodeTool.java | 2 + .../tools/SSTableRepairedAtSetter.java | 4 +- .../cassandra/tools/nodetool/GetReplicas.java | 47 + .../apache/cassandra/tracing/TraceState.java | 2 +- .../transport/messages/ErrorMessage.java | 2 +- src/java/org/apache/cassandra/utils/Pair.java | 12 + .../cassandra/utils/concurrent/Accumulator.java | 27 +- .../legacy_na_clust/na-1-big-CompressionInfo.db | Bin 87 -> 87 bytes .../legacy_na_clust/na-1-big-Data.db | Bin 5259 -> 5214 bytes .../legacy_na_clust/na-1-big-Digest.crc32 | 2 +- .../legacy_na_clust/na-1-big-Index.db | Bin 157553 -> 157553 bytes .../legacy_na_clust/na-1-big-Statistics.db | Bin 7095 -> 7096 bytes .../legacy_na_clust/na-1-big-TOC.txt | 8 +- .../na-1-big-CompressionInfo.db | Bin 79 -> 79 bytes .../legacy_na_clust_counter/na-1-big-Data.db | Bin 5888 -> 5759 bytes .../na-1-big-Digest.crc32 | 2 +- .../legacy_na_clust_counter/na-1-big-Index.db | Bin 157553 -> 157553 bytes .../na-1-big-Statistics.db | Bin 7104 -> 7105 bytes .../legacy_na_clust_counter/na-1-big-TOC.txt | 8 +- .../legacy_na_simple/na-1-big-Data.db | Bin 89 -> 88 bytes .../legacy_na_simple/na-1-big-Digest.crc32 | 2 +- .../legacy_na_simple/na-1-big-Statistics.db | Bin 4648 -> 4649 bytes .../legacy_na_simple/na-1-big-TOC.txt | 8 +- .../legacy_na_simple_counter/na-1-big-Data.db | Bin 140 -> 138 bytes .../na-1-big-Digest.crc32 | 2 +- .../na-1-big-Statistics.db | Bin 4657 -> 4658 bytes .../legacy_na_simple_counter/na-1-big-TOC.txt | 8 +- .../locator/DynamicEndpointSnitchLongTest.java | 20 +- .../cassandra/streaming/LongStreamingTest.java | 11 +- .../test/microbench/PendingRangesBench.java | 27 +- test/unit/org/apache/cassandra/Util.java | 20 + .../config/DatabaseDescriptorRefTest.java | 5 +- .../org/apache/cassandra/cql3/CQLTester.java | 3 +- .../cql3/validation/operations/CreateTest.java | 3 +- .../org/apache/cassandra/db/CleanupTest.java | 9 +- .../cassandra/db/CleanupTransientTest.java | 195 +++++ .../org/apache/cassandra/db/ImportTest.java | 2 +- .../db/RepairedDataTombstonesTest.java | 2 +- .../apache/cassandra/db/RowUpdateBuilder.java | 6 + .../unit/org/apache/cassandra/db/ScrubTest.java | 6 +- .../db/SystemKeyspaceMigrator40Test.java | 26 + .../apache/cassandra/db/TableCQLHelperTest.java | 3 + .../org/apache/cassandra/db/VerifyTest.java | 4 +- .../compaction/AbstractPendingRepairTest.java | 13 +- .../db/compaction/AntiCompactionTest.java | 234 +++-- ...pactionStrategyManagerPendingRepairTest.java | 163 +++- .../CompactionStrategyManagerTest.java | 50 +- .../db/compaction/CompactionTaskTest.java | 10 +- .../db/compaction/CompactionsCQLTest.java | 4 +- .../LeveledCompactionStrategyTest.java | 2 +- .../db/compaction/PendingRepairManagerTest.java | 28 +- .../db/compaction/SingleSSTableLCSTaskTest.java | 6 +- .../db/lifecycle/LogTransactionTest.java | 2 +- .../db/lifecycle/RealTransactionsTest.java | 1 + ...tionManagerGetSSTablesForValidationTest.java | 4 +- .../db/repair/PendingAntiCompactionTest.java | 39 +- .../db/streaming/CassandraOutgoingFileTest.java | 1 + .../streaming/CassandraStreamManagerTest.java | 16 +- .../db/streaming/StreamRequestTest.java | 98 +++ .../apache/cassandra/db/view/ViewUtilsTest.java | 23 +- .../apache/cassandra/dht/BootStrapperTest.java | 34 +- .../dht/RangeFetchMapCalculatorTest.java | 138 ++- .../org/apache/cassandra/dht/RangeTest.java | 12 +- .../org/apache/cassandra/dht/SplitterTest.java | 63 +- .../cassandra/dht/StreamStateStoreTest.java | 5 +- .../gms/PendingRangeCalculatorServiceTest.java | 2 +- .../io/sstable/BigTableWriterTest.java | 2 +- .../io/sstable/CQLSSTableWriterTest.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 9 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 7 +- .../io/sstable/SSTableRewriterTest.java | 2 +- .../cassandra/io/sstable/SSTableUtils.java | 2 +- .../cassandra/io/sstable/SSTableWriterTest.java | 60 ++ .../io/sstable/SSTableWriterTestBase.java | 11 +- .../format/SSTableFlushObserverTest.java | 2 +- .../metadata/MetadataSerializerTest.java | 2 +- .../locator/DynamicEndpointSnitchTest.java | 42 +- .../locator/NetworkTopologyStrategyTest.java | 92 +- .../locator/OldNetworkTopologyStrategyTest.java | 28 +- .../cassandra/locator/PendingRangeMapsTest.java | 49 +- .../locator/ReplicaCollectionTest.java | 468 ++++++++++ .../apache/cassandra/locator/ReplicaUtils.java | 44 + .../locator/ReplicationFactorTest.java | 73 ++ .../ReplicationStrategyEndpointCacheTest.java | 56 +- .../cassandra/locator/SimpleStrategyTest.java | 81 +- .../cassandra/locator/TokenMetadataTest.java | 8 +- .../cassandra/net/WriteCallbackInfoTest.java | 7 +- .../async/OutboundMessagingConnectionTest.java | 3 +- .../cassandra/repair/LocalSyncTaskTest.java | 191 ---- .../cassandra/repair/RepairRunnableTest.java | 12 +- .../cassandra/repair/RepairSessionTest.java | 6 +- .../repair/SymmetricLocalSyncTaskTest.java | 232 +++++ .../repair/SymmetricRemoteSyncTaskTest.java | 71 ++ .../repair/consistent/LocalSessionAccessor.java | 3 +- .../repair/consistent/LocalSessionTest.java | 9 +- .../org/apache/cassandra/schema/MockSchema.java | 2 +- .../service/ActiveRepairServiceTest.java | 50 +- .../service/BootstrapTransientTest.java | 179 ++++ .../service/LeaveAndBootstrapTest.java | 48 +- .../org/apache/cassandra/service/MoveTest.java | 324 +++---- .../cassandra/service/MoveTransientTest.java | 638 ++++++++++++++ .../cassandra/service/StorageServiceTest.java | 148 ++++ .../service/WriteResponseHandlerTest.java | 58 +- .../WriteResponseHandlerTransientTest.java | 224 +++++ .../service/reads/AbstractReadResponseTest.java | 300 +++++++ .../service/reads/DataResolverTest.java | 486 +++++------ .../reads/DataResolverTransientTest.java | 226 +++++ .../service/reads/DigestResolverTest.java | 144 ++++ .../service/reads/ReadExecutorTest.java | 46 +- .../reads/repair/AbstractReadRepairTest.java | 53 +- .../reads/repair/BlockingReadRepairTest.java | 113 ++- .../DiagEventsBlockingReadRepairTest.java | 45 +- .../reads/repair/InstrumentedReadRepair.java | 4 +- .../reads/repair/ReadOnlyReadRepairTest.java | 30 +- .../service/reads/repair/ReadRepairTest.java | 353 ++++++++ .../reads/repair/TestableReadRepair.java | 50 +- .../streaming/StreamingTransferTest.java | 7 +- .../utils/concurrent/AccumulatorTest.java | 2 +- 300 files changed, 11945 insertions(+), 4347 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e76586..b53b986 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Transient Replication and Cheap Quorums (CASSANDRA-14404) * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675) * Add diagnostic events for read repairs (CASSANDRA-14668) * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index aa8281c..5066378 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool. New features ------------ + - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404 + The intended audience for this functionality is expert users of Cassandra who are prepared + to validate every aspect of the database for their application and deployment practices. Future + releases of Cassandra will make this feature suitable for a wider audience. - *Experimental* support for Java 11 has been added. JVM options that differ between or are specific for Java 8 and 11 have been moved from jvm.options into jvm8.options and jvm11.options. IMPORTANT: Running C* on Java 11 is *experimental* and do it at your own risk. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 064ee4f..503a0fa 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1052,6 +1052,10 @@ enable_scripted_user_defined_functions: false # Materialized views are considered experimental and are not recommended for production use. enable_materialized_views: true +# Enables creation of transiently replicated keyspaces on this node. +# Transient replication is experimental and is not recommended for production use. +#enable_transient_replication: true + # The default Windows kernel timer and scheduling resolution is 15.6ms for power conservation. # Lowering this value on Windows can provide much tighter latency and better throughput, however # some virtualized environments may see a negative performance impact from changing this setting http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/architecture/dynamo.rst ---------------------------------------------------------------------- diff --git a/doc/source/architecture/dynamo.rst b/doc/source/architecture/dynamo.rst index 365a695..12c586e 100644 --- a/doc/source/architecture/dynamo.rst +++ b/doc/source/architecture/dynamo.rst @@ -74,6 +74,35 @@ nodes in each rack, the data load on the smallest rack may be much higher. Simi into a new rack, it will be considered a replica for the entire ring. For this reason, many operators choose to configure all nodes on a single "rack". +.. _transient-replication: + +Transient Replication +~~~~~~~~~~~~~~~~~~~~~ + +Transient replication allows you to configure a subset of replicas to only replicate data that hasn't been incrementally +repaired. This allows you to decouple data redundancy from availability. For instance, if you have a keyspace replicated +at rf 3, and alter it to rf 5 with 2 transient replicas, you go from being able to tolerate one failed replica to being +able to tolerate two, without corresponding increase in storage usage. This is because 3 nodes will replicate all the data +for a given token range, and the other 2 will only replicate data that hasn't been incrementally repaired. + +To use transient replication, you first need to enable it in ``cassandra.yaml``. Once enabled, both SimpleStrategy and +NetworkTopologyStrategy can be configured to transiently replicate data. You configure it by specifying replication factor +as ``<total_replicas>/<transient_replicas`` Both SimpleStrategy and NetworkTopologyStrategy support configuring transient +replication. + +Transiently replicated keyspaces only support tables created with read_repair set to NONE and monotonic reads are not currently supported. +You also can't use LWT, logged batches, and counters in 4.0. You will possibly never be able to use materialized views +with transiently replicated keyspaces and probably never be able to use 2i with them. + +Transient replication is an experimental feature that may not be ready for production use. The expected audienced is experienced +users of Cassandra capable of fully validating a deployment of their particular application. That means being able check +that operations like reads, writes, decommission, remove, rebuild, repair, and replace all work with your queries, data, +configuration, operational practices, and availability requirements. + +It is anticipated that 4.next will support monotonic reads with transient replication as well as LWT, logged batches, and +counters. + + Tunable Consistency ^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/cql/ddl.rst ---------------------------------------------------------------------- diff --git a/doc/source/cql/ddl.rst b/doc/source/cql/ddl.rst index 9afd638..2d9a50a 100644 --- a/doc/source/cql/ddl.rst +++ b/doc/source/cql/ddl.rst @@ -105,6 +105,14 @@ strategy is used. By default, Cassandra support the following ``'class'``: Attempting to create a keyspace that already exists will return an error unless the ``IF NOT EXISTS`` option is used. If it is used, the statement will be a no-op if the keyspace already exists. +If :ref:`transient replication <transient-replication>` has been enabled, transient replicas can be configured for both +SimpleStrategy and NetworkTopologyStrategy by defining replication factors in the format ``'<total_replicas>/<transient_replicas>'`` + +For instance, this keyspace will have 3 replicas in DC1, 1 of which is transient, and 5 replicas in DC2, 2 of which are transient:: + + CREATE KEYSPACE some_keysopace + WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : '3/1'', 'DC2' : '5/2'}; + .. _use-statement: USE @@ -455,6 +463,9 @@ A table supports the following options: | ``speculative_retry`` | *simple* | 99PERCENTILE| :ref:`Speculative retry options | | | | | <speculative-retry-options>`. | +--------------------------------+----------+-------------+-----------------------------------------------------------+ +| ``speculative_write_threshold``| *simple* | 99PERCENTILE| :ref:`Speculative retry options | +| | | | <speculative-retry-options>`. | ++--------------------------------+----------+-------------+-----------------------------------------------------------+ | ``gc_grace_seconds`` | *simple* | 864000 | Time to wait before garbage collecting tombstones | | | | | (deletion markers). | +--------------------------------+----------+-------------+-----------------------------------------------------------+ @@ -485,7 +496,8 @@ Speculative retry options By default, Cassandra read coordinators only query as many replicas as necessary to satisfy consistency levels: one for consistency level ``ONE``, a quorum for ``QUORUM``, and so on. ``speculative_retry`` determines when coordinators may query additional replicas, which is useful -when replicas are slow or unresponsive. The following are legal values (case-insensitive): +when replicas are slow or unresponsive. ``speculative_write_threshold`` specifies the threshold at which +a cheap quorum write will be upgraded to include transient replicas. The following are legal values (case-insensitive): ============================ ======================== ============================================================================= Format Example Description http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip new file mode 100644 index 0000000..8d627a9 Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 5595e2a..405e88e 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -49,6 +49,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): ('max_index_interval', None), ('default_time_to_live', None), ('speculative_retry', None), + ('speculative_write_threshold', None), ('memtable_flush_period_in_ms', None), ('cdc', None), ('read_repair', None), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cqlshhandling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cqlshhandling.py b/pylib/cqlshlib/cqlshhandling.py index 9545876..7abd6ce 100644 --- a/pylib/cqlshlib/cqlshhandling.py +++ b/pylib/cqlshlib/cqlshhandling.py @@ -112,6 +112,7 @@ cqlsh_consistency_level_syntax_rules = r''' | "SERIAL" | "LOCAL_SERIAL" | "LOCAL_ONE" + | "NODE_LOCAL" ; ''' http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_completion.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py index fa9490d..794c591 100644 --- a/pylib/cqlshlib/test/test_cqlsh_completion.py +++ b/pylib/cqlshlib/test/test_cqlsh_completion.py @@ -594,7 +594,7 @@ class TestCqlshCompletion(CqlshCompletionCase): 'memtable_flush_period_in_ms', 'CLUSTERING', 'COMPACT', 'caching', 'comment', - 'min_index_interval', 'speculative_retry', 'cdc']) + 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc']) self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH ', choices=['bloom_filter_fp_chance', 'compaction', 'compression', @@ -603,7 +603,7 @@ class TestCqlshCompletion(CqlshCompletionCase): 'memtable_flush_period_in_ms', 'CLUSTERING', 'COMPACT', 'caching', 'comment', - 'min_index_interval', 'speculative_retry', 'cdc']) + 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc']) self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance ', immediate='= ') self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance = ', @@ -650,7 +650,7 @@ class TestCqlshCompletion(CqlshCompletionCase): 'memtable_flush_period_in_ms', 'CLUSTERING', 'COMPACT', 'caching', 'comment', - 'min_index_interval', 'speculative_retry', 'cdc']) + 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc']) self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = " + "{'class': 'DateTieredCompactionStrategy', '", choices=['base_time_seconds', 'max_sstable_age_days', http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_output.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py index 2f0d9bf..46546f0 100644 --- a/pylib/cqlshlib/test/test_cqlsh_output.py +++ b/pylib/cqlshlib/test/test_cqlsh_output.py @@ -622,7 +622,8 @@ class TestCqlshOutput(BaseTestCase): AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 - AND speculative_retry = '99PERCENTILE'; + AND speculative_retry = '99PERCENTILE' + AND speculative_write_threshold = '99PERCENTILE'; """ % quote_name(get_keyspace())) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 4809bd7..8dda54e 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -26,14 +26,20 @@ import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicates; import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.EndpointsForToken; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.*; @@ -419,7 +425,7 @@ public class BatchlogManager implements BatchlogManagerMBean if (handler != null) { hintedNodes.addAll(handler.undelivered); - HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint), + HintsService.instance.write(Collections2.transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint), Hint.create(undeliveredMutation, writtenAt)); } } @@ -449,35 +455,41 @@ public class BatchlogManager implements BatchlogManagerMBean long writtenAt, Set<InetAddressAndPort> hintedNodes) { - Set<InetAddressAndPort> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); + Keyspace keyspace = Keyspace.open(ks); Token tk = mutation.key().getToken(); - for (InetAddressAndPort endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk)) + EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk); + Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549 + + EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk); + for (Replica replica : replicas) { - if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) + if (replica.isLocal()) { mutation.apply(); } - else if (FailureDetector.instance.isAlive(endpoint)) + else if (FailureDetector.instance.isAlive(replica.endpoint())) { - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. + liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint. } else { - hintedNodes.add(endpoint); - HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint), + hintedNodes.add(replica.endpoint()); + HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()), Hint.create(mutation, writtenAt)); } } - if (liveEndpoints.isEmpty()) + EndpointsForToken liveReplicas = liveReplicasBuilder.build(); + if (liveReplicas.isEmpty()) return null; - ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime()); + Replicas.temporaryAssertFull(liveReplicas); + ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime()); MessageOut<Mutation> message = mutation.createMessage(); - for (InetAddressAndPort endpoint : liveEndpoints) - MessagingService.instance().sendRR(message, endpoint, handler, false); + for (Replica replica : liveReplicas) + MessagingService.instance().sendWriteRR(message, replica, handler, false); return handler; } @@ -497,16 +509,17 @@ public class BatchlogManager implements BatchlogManagerMBean { private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - ReplayWriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, long queryStartNanoTime) + ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime) { - super(writeEndpoints, Collections.<InetAddressAndPort>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); - undelivered.addAll(writeEndpoints); + super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())), + null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); + Iterables.addAll(undelivered, writeReplicas.endpoints()); } @Override protected int totalBlockFor() { - return this.naturalEndpoints.size(); + return this.replicaLayout.selected().size(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a13070c..783dcc1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -339,6 +339,8 @@ public class Config public boolean enable_materialized_views = true; + public boolean enable_transient_replication = false; + /** * Optionally disable asynchronous UDF execution. * Disabling asynchronous UDF execution also implicitly disables the security-manager! http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index af13f9c..75b3fc3 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -59,6 +59,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.EndpointSnitchInfo; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.net.BackPressureStrategy; import org.apache.cassandra.net.RateBasedBackPressure; @@ -122,7 +123,7 @@ public class DatabaseDescriptor private static long indexSummaryCapacityInMB; private static String localDC; - private static Comparator<InetAddressAndPort> localComparator; + private static Comparator<Replica> localComparator; private static EncryptionContext encryptionContext; private static boolean hasLoggedConfig; @@ -991,18 +992,14 @@ public class DatabaseDescriptor EndpointSnitchInfo.create(); localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - localComparator = new Comparator<InetAddressAndPort>() - { - public int compare(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } + localComparator = (replica1, replica2) -> { + boolean local1 = localDC.equals(snitch.getDatacenter(replica1)); + boolean local2 = localDC.equals(snitch.getDatacenter(replica2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; }; } @@ -2308,7 +2305,7 @@ public class DatabaseDescriptor return localDC; } - public static Comparator<InetAddressAndPort> getLocalComparator() + public static Comparator<Replica> getLocalComparator() { return localComparator; } @@ -2459,6 +2456,16 @@ public class DatabaseDescriptor return conf.enable_materialized_views; } + public static boolean isTransientReplicationEnabled() + { + return conf.enable_transient_replication; + } + + public static void setTransientReplicationEnabledUnsafe(boolean enabled) + { + conf.enable_transient_replication = enabled; + } + public static long getUserDefinedFunctionFailTimeout() { return conf.user_defined_function_fail_timeout; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 79e19c1..45db947 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -202,7 +202,18 @@ public class QueryProcessor implements QueryHandler statement.authorize(clientState); statement.validate(clientState); - ResultMessage result = statement.execute(queryState, options, queryStartNanoTime); + ResultMessage result; + if (options.getConsistency() == ConsistencyLevel.NODE_LOCAL) + { + assert Boolean.getBoolean("cassandra.enable_nodelocal_queries") : "Node local consistency level is highly dangerous and should be used only for debugging purposes"; + assert statement instanceof SelectStatement : "Only SELECT statements are permitted for node-local execution"; + logger.info("Statement {} executed with NODE_LOCAL consistency level.", statement); + result = statement.executeLocally(queryState, options); + } + else + { + result = statement.execute(queryState, options, queryStartNanoTime); + } return result == null ? new ResultMessage.Void() : result; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index e925735..fa637ef 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -261,7 +261,7 @@ public class BatchStatement implements CQLStatement return statements; } - private Collection<? extends IMutation> getMutations(BatchQueryOptions options, + private List<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long batchTimestamp, int nowInSeconds, @@ -401,7 +401,7 @@ public class BatchStatement implements CQLStatement return new ResultMessage.Void(); } - private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + private void executeWithoutConditions(List<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { if (mutations.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java index 96d9f5a..8f70ffc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@ -104,7 +104,7 @@ final class BatchUpdatesCollector implements UpdatesCollector * Returns a collection containing all the mutations. * @return a collection containing all the mutations. */ - public Collection<IMutation> toMutations() + public List<IMutation> toMutations() { //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that? List<IMutation> ms = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 13fc659..a8367f0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -465,7 +465,7 @@ public abstract class ModificationStatement implements CQLStatement else cl.validateForWrite(metadata.keyspace); - Collection<? extends IMutation> mutations = + List<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), @@ -676,7 +676,7 @@ public abstract class ModificationStatement implements CQLStatement * * @return list of the mutations */ - private Collection<? extends IMutation> getMutations(QueryOptions options, + private List<? extends IMutation> getMutations(QueryOptions options, boolean local, long timestamp, int nowInSeconds, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java index 1def3fd..6ef551d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java @@ -82,7 +82,7 @@ final class SingleTableUpdatesCollector implements UpdatesCollector * Returns a collection containing all the mutations. * @return a collection containing all the mutations. */ - public Collection<IMutation> toMutations() + public List<IMutation> toMutations() { List<IMutation> ms = new ArrayList<>(); for (PartitionUpdate.Builder builder : puBuilders.values()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java index 30db7ca..c3dd334 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java @@ -18,17 +18,16 @@ package org.apache.cassandra.cql3.statements; -import java.util.Collection; +import java.util.List; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IMutation; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableMetadata; public interface UpdatesCollector { PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency); - Collection<IMutation> toMutations(); + List<IMutation> toMutations(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 12e73d0..2f0c188 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -17,16 +17,27 @@ */ package org.apache.cassandra.cql3.statements.schema; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.collect.ImmutableSet; import org.apache.cassandra.audit.AuditLogContext; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.locator.ReplicationFactor; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff; import org.apache.cassandra.schema.Keyspaces; @@ -34,9 +45,13 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.utils.FBUtilities; public final class AlterKeyspaceStatement extends AlterSchemaStatement { + private static final boolean allow_alter_rf_during_range_movement = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement"); + private static final boolean allow_unsafe_transient_changes = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_unsafe_transient_changes"); + private final KeyspaceAttributes attrs; public AlterKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs) @@ -60,6 +75,9 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement newKeyspace.params.validate(keyspaceName); + validateNoRangeMovements(); + validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy()); + return schema.withAddedOrUpdated(newKeyspace); } @@ -84,11 +102,77 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy(); AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy(); - return before.getReplicationFactor() < after.getReplicationFactor() + return before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas ? ImmutableSet.of("When increasing replication factor you need to run a full (-full) repair to distribute the data.") : ImmutableSet.of(); } + private void validateNoRangeMovements() + { + if (allow_alter_rf_during_range_movement) + return; + + Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream()); + List<InetAddressAndPort> notNormalEndpoints = endpoints.filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) && + !Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState()) + .collect(Collectors.toList()); + if (!notNormalEndpoints.isEmpty()) + { + throw new ConfigurationException("Cannot alter RF while some endpoints are not in normal state (no range movements): " + notNormalEndpoints); + } + } + + private void validateTransientReplication(AbstractReplicationStrategy oldStrategy, AbstractReplicationStrategy newStrategy) + { + //If there is no read traffic there are some extra alterations you can safely make, but this is so atypical + //that a good default is to not allow unsafe changes + if (allow_unsafe_transient_changes) + return; + + ReplicationFactor oldRF = oldStrategy.getReplicationFactor(); + ReplicationFactor newRF = newStrategy.getReplicationFactor(); + + int oldTrans = oldRF.transientReplicas(); + int oldFull = oldRF.fullReplicas; + int newTrans = newRF.transientReplicas(); + int newFull = newRF.fullReplicas; + + if (newTrans > 0) + { + if (DatabaseDescriptor.getNumTokens() > 1) + throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet")); + + Keyspace ks = Keyspace.open(keyspaceName); + for (ColumnFamilyStore cfs : ks.getColumnFamilyStores()) + { + if (cfs.viewManager.hasViews()) + { + throw new ConfigurationException("Cannot use transient replication on keyspaces using materialized views"); + } + + if (cfs.indexManager.hasIndexes()) + { + throw new ConfigurationException("Cannot use transient replication on keyspaces using secondary indexes"); + } + } + } + + //This is true right now because the transition from transient -> full lacks the pending state + //necessary for correctness. What would happen if we allowed this is that we would attempt + //to read from a transient replica as if it were a full replica. + if (oldFull > newFull && oldTrans > 0) + throw new ConfigurationException("Can't add full replicas if there are any transient replicas. You must first remove all transient replicas, then change the # of full replicas, then add back the transient replicas"); + + //Don't increase transient replication factor by more than one at a time if changing number of replicas + //Just like with changing full replicas it's not safe to do this as you could read from too many replicas + //that don't have the necessary data. W/O transient replication this alteration was allowed and it's not clear + //if it should be. + //This is structured so you can convert as many full replicas to transient replicas as you want. + boolean numReplicasChanged = oldTrans + oldFull != newTrans + newFull; + if (numReplicasChanged && (newTrans > oldTrans && newTrans != oldTrans + 1)) + throw new ConfigurationException("Can only safely increase number of transients one at a time with incremental repair run in between each time"); + } + @Override public AuditLogContext getAuditLogContext() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index 3ec75b2..5044119 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -360,6 +361,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement "before being replayed."); } + if (keyspace.createReplicationStrategy().hasTransientReplicas() + && params.readRepair != ReadRepairStrategy.NONE) + { + throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces"); + } + return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java index df41358..dbca160 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -28,7 +28,9 @@ import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QualifiedName; import org.apache.cassandra.cql3.statements.schema.IndexTarget.Type; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; @@ -88,6 +90,9 @@ public final class CreateIndexStatement extends AlterSchemaStatement if (table.isView()) throw ire("Secondary indexes on materialized views aren't supported"); + if (Keyspace.open(table.keyspace).getReplicationStrategy().hasTransientReplicas()) + throw new InvalidRequestException("Secondary indexes are not supported on transiently replicated keyspaces"); + List<IndexTarget> indexTargets = Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table))); if (indexTargets.isEmpty() && !attrs.isCustom) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java index 62fcafe..be7907f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java @@ -27,11 +27,14 @@ import org.apache.cassandra.auth.DataResource; import org.apache.cassandra.auth.IResource; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -98,6 +101,12 @@ public final class CreateTableStatement extends AlterSchemaStatement TableMetadata table = builder(keyspace.types).build(); table.validate(); + if (keyspace.createReplicationStrategy().hasTransientReplicas() + && table.params.readRepair != ReadRepairStrategy.NONE) + { + throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces"); + } + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java index 5f62001..bf6bcff 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java @@ -31,9 +31,11 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.cql3.selection.RawSelector; import org.apache.cassandra.cql3.selection.Selectable; import org.apache.cassandra.cql3.statements.StatementType; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.ReversedType; import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; @@ -107,6 +109,9 @@ public final class CreateViewStatement extends AlterSchemaStatement if (null == keyspace) throw ire("Keyspace '%s' doesn't exist", keyspaceName); + if (keyspace.createReplicationStrategy().hasTransientReplicas()) + throw new InvalidRequestException("Materialized views are not supported on transiently replicated keyspaces"); + TableMetadata table = keyspace.tables.getNullable(tableName); if (null == table) throw ire("Base table '%s' doesn't exist", tableName); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java index c8e464a..4e66307 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java @@ -128,6 +128,9 @@ public final class TableAttributes extends PropertyDefinitions if (hasOption(Option.SPECULATIVE_RETRY)) builder.speculativeRetry(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_RETRY))); + if (hasOption(Option.SPECULATIVE_WRITE_THRESHOLD)) + builder.speculativeWriteThreshold(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_WRITE_THRESHOLD))); + if (hasOption(Option.CRC_CHECK_CHANCE)) builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 5e38584..56851e2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,7 +30,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import javax.annotation.Nullable; import javax.management.*; import javax.management.openmbean.*; @@ -68,7 +67,6 @@ import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -81,7 +79,6 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.repair.TableRepairManager; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.TableStreamManager; @@ -205,7 +202,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private final Directories directories; public final TableMetrics metric; - public volatile long sampleLatencyNanos; + public volatile long sampleReadLatencyNanos; + public volatile long transientWriteLatencyNanos; private final CassandraTableWriteHandler writeHandler; private final CassandraStreamManager streamManager; @@ -384,7 +382,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean viewManager = keyspace.viewManager.forTable(metadata.id); metric = new TableMetrics(this); fileIndexGenerator.set(generation); - sampleLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2); + sampleReadLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2); + transientWriteLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout() / 2); logger.info("Initializing {}.{}", keyspace.getName(), name); @@ -454,7 +453,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { try { - sampleLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency); + sampleReadLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency); + transientWriteLatencyNanos = metadata().params.speculativeWriteThreshold.calculateThreshold(metric.coordinatorWriteLatency); } catch (Throwable e) { @@ -487,15 +487,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return directories; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata().comparator).sstableLevel(sstableLevel); - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, txn); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, collector, header, txn); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, metadataCollector, header, indexManager.listIndexes(), txn); + return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadataCollector, header, indexManager.listIndexes(), txn); } public boolean supportsEarlyOpen() @@ -1402,7 +1402,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // cleanup size estimation only counts bytes for keys local to this node long expectedFileSize = 0; - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges(); for (SSTableReader sstable : sstables) { List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(ranges); @@ -1677,7 +1677,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void cleanupCache() { - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges(); for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); keyIter.hasNext(); ) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index d37da0a..35ba198 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -17,16 +17,18 @@ */ package org.apache.cassandra.db; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import com.google.common.collect.Iterables; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.Replicas; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -47,7 +49,8 @@ public enum ConsistencyLevel EACH_QUORUM (7), SERIAL (8), LOCAL_SERIAL(9), - LOCAL_ONE (10, true); + LOCAL_ONE (10, true), + NODE_LOCAL (11, true); private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class); @@ -89,13 +92,13 @@ public enum ConsistencyLevel private int quorumFor(Keyspace keyspace) { - return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1; + return (keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1; } private int localQuorumFor(Keyspace keyspace, String dc) { return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) - ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1 + ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1 : quorumFor(keyspace); } @@ -116,7 +119,7 @@ public enum ConsistencyLevel case SERIAL: return quorumFor(keyspace); case ALL: - return keyspace.getReplicationStrategy().getReplicationFactor(); + return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; case LOCAL_QUORUM: case LOCAL_SERIAL: return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); @@ -138,6 +141,28 @@ public enum ConsistencyLevel } } + public int blockForWrite(Keyspace keyspace, Endpoints<?> pending) + { + assert pending != null; + + int blockFor = blockFor(keyspace); + switch (this) + { + case ANY: + break; + case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL: + // we will only count local replicas towards our response count, as these queries only care about local guarantees + blockFor += countDCLocalReplicas(pending).allReplicas(); + break; + case ONE: case TWO: case THREE: + case QUORUM: case EACH_QUORUM: + case SERIAL: + case ALL: + blockFor += pending.size(); + } + return blockFor; + } + /** * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace */ @@ -156,40 +181,75 @@ public enum ConsistencyLevel return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); } - public int countLocalEndpoints(Iterable<InetAddressAndPort> liveEndpoints) + public static boolean isLocal(Replica replica) + { + return isLocal(replica.endpoint()); + } + + private static ReplicaCount countDCLocalReplicas(ReplicaCollection<?> liveReplicas) { - int count = 0; - for (InetAddressAndPort endpoint : liveEndpoints) - if (isLocal(endpoint)) - count++; + ReplicaCount count = new ReplicaCount(); + for (Replica replica : liveReplicas) + if (isLocal(replica)) + count.increment(replica); return count; } - private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) + private static class ReplicaCount + { + int fullReplicas; + int transientReplicas; + + int allReplicas() + { + return fullReplicas + transientReplicas; + } + + void increment(Replica replica) + { + if (replica.isFull()) ++fullReplicas; + else ++transientReplicas; + } + + boolean isSufficient(int allReplicas, int fullReplicas) + { + return this.fullReplicas >= fullReplicas + && this.allReplicas() >= allReplicas; + } + } + + private static Map<String, ReplicaCount> countPerDCEndpoints(Keyspace keyspace, Iterable<Replica> liveReplicas) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - Map<String, Integer> dcEndpoints = new HashMap<String, Integer>(); + Map<String, ReplicaCount> dcEndpoints = new HashMap<>(); for (String dc: strategy.getDatacenters()) - dcEndpoints.put(dc, 0); + dcEndpoints.put(dc, new ReplicaCount()); - for (InetAddressAndPort endpoint : liveEndpoints) + for (Replica replica : liveReplicas) { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); - dcEndpoints.put(dc, dcEndpoints.get(dc) + 1); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + dcEndpoints.get(dc).increment(replica); } return dcEndpoints; } - public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints) + public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas) + { + return filterForQuery(keyspace, liveReplicas, false); + } + + public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas, boolean alwaysSpeculate) { /* * If we are doing an each quorum query, we have to make sure that the endpoints we select * provide a quorum for each data center. If we are not using a NetworkTopologyStrategy, * we should fall through and grab a quorum in the replication strategy. + * + * We do not speculate for EACH_QUORUM. */ if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) - return filterForEachQuorum(keyspace, liveEndpoints); + return filterForEachQuorum(keyspace, liveReplicas); /* * Endpoints are expected to be restricted to live replicas, sorted by snitch preference. @@ -198,36 +258,34 @@ public enum ConsistencyLevel * the blockFor first ones). */ if (isDCLocal) - liveEndpoints.sort(DatabaseDescriptor.getLocalComparator()); + liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator()); - return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace))); + return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0))); } - private List<InetAddressAndPort> filterForEachQuorum(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints) + private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - - Map<String, List<InetAddressAndPort>> dcsEndpoints = new HashMap<>(); - for (String dc: strategy.getDatacenters()) - dcsEndpoints.put(dc, new ArrayList<>()); - - for (InetAddressAndPort add : liveEndpoints) + Map<String, Integer> dcsReplicas = new HashMap<>(); + for (String dc : strategy.getDatacenters()) { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add); - dcsEndpoints.get(dc).add(add); + // we put _up to_ dc replicas only + dcsReplicas.put(dc, localQuorumFor(keyspace, dc)); } - List<InetAddressAndPort> waitSet = new ArrayList<>(); - for (Map.Entry<String, List<InetAddressAndPort>> dcEndpoints : dcsEndpoints.entrySet()) - { - List<InetAddressAndPort> dcEndpoint = dcEndpoints.getValue(); - waitSet.addAll(dcEndpoint.subList(0, Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size()))); - } - - return waitSet; + return liveReplicas.filter((replica) -> { + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + int replicas = dcsReplicas.get(dc); + if (replicas > 0) + { + dcsReplicas.put(dc, --replicas); + return true; + } + return false; + }); } - public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) + public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) { switch (this) { @@ -235,75 +293,92 @@ public enum ConsistencyLevel // local hint is acceptable, and local node is always live return true; case LOCAL_ONE: - return countLocalEndpoints(liveEndpoints) >= 1; + return countDCLocalReplicas(liveReplicas).isSufficient(1, 1); case LOCAL_QUORUM: - return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace); + return countDCLocalReplicas(liveReplicas).isSufficient(blockFor(keyspace), 1); case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) + int fullCount = 0; + for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, liveReplicas).entrySet()) { - if (entry.getValue() < localQuorumFor(keyspace, entry.getKey())) + ReplicaCount count = entry.getValue(); + if (!count.isSufficient(localQuorumFor(keyspace, entry.getKey()), 0)) return false; + fullCount += count.fullReplicas; } - return true; + return fullCount > 0; } // Fallthough on purpose for SimpleStrategy default: - return Iterables.size(liveEndpoints) >= blockFor(keyspace); + return liveReplicas.size() >= blockFor(keyspace) + && Replicas.countFull(liveReplicas) > 0; } } - public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) throws UnavailableException + public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException + { + assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1); + } + public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException + { + assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0); + } + public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException { - int blockFor = blockFor(keyspace); switch (this) { case ANY: // local hint is acceptable, and local node is always live break; case LOCAL_ONE: - if (countLocalEndpoints(liveEndpoints) == 0) - throw new UnavailableException(this, 1, 0); + { + ReplicaCount localLive = countDCLocalReplicas(allLive); + if (!localLive.isSufficient(blockFor, blockForFullReplicas)) + throw UnavailableException.create(this, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas); break; + } case LOCAL_QUORUM: - int localLive = countLocalEndpoints(liveEndpoints); - if (localLive < blockFor) + { + ReplicaCount localLive = countDCLocalReplicas(allLive); + if (!localLive.isSufficient(blockFor, blockForFullReplicas)) { if (logger.isTraceEnabled()) { - StringBuilder builder = new StringBuilder("Local replicas ["); - for (InetAddressAndPort endpoint : liveEndpoints) - { - if (isLocal(endpoint)) - builder.append(endpoint).append(","); - } - builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockFor).append(" live nodes in '").append(DatabaseDescriptor.getLocalDataCenter()).append("'"); - logger.trace(builder.toString()); + logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'", + allLive.filter(ConsistencyLevel::isLocal), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter())); } - throw new UnavailableException(this, blockFor, localLive); + throw UnavailableException.create(this, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas); } break; + } case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) + int total = 0; + int totalFull = 0; + for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, allLive).entrySet()) { int dcBlockFor = localQuorumFor(keyspace, entry.getKey()); - int dcLive = entry.getValue(); - if (dcLive < dcBlockFor) - throw new UnavailableException(this, entry.getKey(), dcBlockFor, dcLive); + ReplicaCount dcCount = entry.getValue(); + if (!dcCount.isSufficient(dcBlockFor, 0)) + throw UnavailableException.create(this, entry.getKey(), dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas); + totalFull += dcCount.fullReplicas; + total += dcCount.allReplicas(); } + if (totalFull < blockForFullReplicas) + throw UnavailableException.create(this, blockFor, total, blockForFullReplicas, totalFull); break; } // Fallthough on purpose for SimpleStrategy default: - int live = Iterables.size(liveEndpoints); - if (live < blockFor) + int live = allLive.size(); + int full = Replicas.countFull(allLive); + if (live < blockFor || full < blockForFullReplicas) { if (logger.isTraceEnabled()) - logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor); - throw new UnavailableException(this, blockFor, live); + logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor); + throw UnavailableException.create(this, blockFor, blockForFullReplicas, live, full); } break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org