This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a3ca008bad2a7bfa887a8ba2d119fdc4369ba08
Merge: c26269b 080280d
Author: Sylvain Lebresne <lebre...@gmail.com>
AuthorDate: Fri Nov 27 17:07:15 2020 +0100

    Merge commit '080280dc0177da6176dd4ba970e5a35aa7e2a729' into trunk

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../org/apache/cassandra/service/StorageProxy.java | 352 +++++++----
 .../org/apache/cassandra/service/paxos/Commit.java |   6 +
 .../cassandra/service/paxos/PrepareCallback.java   |  12 +-
 .../cassandra/distributed/impl/Instance.java       |  83 +++
 .../apache/cassandra/distributed/test/CASTest.java | 688 +++++++++++++++++++++
 7 files changed, 1034 insertions(+), 116 deletions(-)

diff --cc CHANGES.txt
index 92094dc,c3c5f02..56843d7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,8 +1,21 @@@
 -3.11.10
 - * Rate limit validation compactions using compaction_throughput_mb_per_sec 
(CASSANDRA-16161)
 +4.0-beta4
 + * Update jctools dependency to 3.1.0 (CASSANDRA-16255)
 + * 'SSLEngine closed already' exception on failed outbound connection 
(CASSANDRA-16277)
 + * Drain and/or shutdown might throw because of slow messaging service 
shutdown (CASSANDRA-16276)
 + * Upgrade JNA to 5.6.0, dropping support for <=glibc-2.6 systems 
(CASSANDRA-16212)
 + * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246)
 + * Ensure that CacheMetrics.requests is picked up by the metric reporter 
(CASSANDRA-16228)
 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
 + * Produce consistent tombstone for reads to avoid digest mistmatch 
(CASSANDRA-15369)
 + * Fix SSTableloader issue when restoring a table named backups 
(CASSANDRA-16235)
 + * Invalid serialized size for responses caused by increasing message time by 
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
 + * Throw BufferOverflowException from DataOutputBuffer for better visibility 
(CASSANDRA-16214)
 + * TLS connections to the storage port on a node without server encryption 
configured causes java.io.IOException accessing missing keystore 
(CASSANDRA-16144)
 + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
 +Merged from 3.11:
   * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
  Merged from 3.0:
+  * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
   * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
   * Improved check of num_tokens against the length of initial_token 
(CASSANDRA-14477)
   * Fix a race condition on ColumnFamilyStore and TableMetrics 
(CASSANDRA-16228)
diff --cc NEWS.txt
index 9774d00,c5a3439..d02f2f0
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -33,256 -42,21 +33,264 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +4.0
 +===
 +
 +New features
 +------------
 +    - Nodes will now bootstrap all intra-cluster connections at startup by 
default and wait
 +      10 seconds for the all but one node in the local data center to be 
connected and marked
 +      UP in gossip. This prevents nodes from coordinating requests and 
failing because they
 +      aren't able to connect to the cluster fast enough. 
block_for_peers_timeout_in_secs in
 +      cassandra.yaml can be used to configure how long to wait (or whether to 
wait at all)
 +      and block_for_peers_in_remote_dcs can be used to also block on all but 
one node in
 +      each remote DC as well. See CASSANDRA-14297 and CASSANDRA-13993 for 
more information.
 +    - *Experimental* support for Transient Replication and Cheap Quorums 
introduced by CASSANDRA-14404
 +      The intended audience for this functionality is expert users of 
Cassandra who are prepared
 +      to validate every aspect of the database for their application and 
deployment practices. Future
 +      releases of Cassandra will make this feature suitable for a wider 
audience.
 +    - *Experimental* support for Java 11 has been added. JVM options that 
differ between or are
 +      specific for Java 8 and 11 have been moved from jvm.options into 
jvm8.options and jvm11.options.
 +      IMPORTANT: Running C* on Java 11 is *experimental* and do it at your 
own risk.
 +    - LCS now respects the max_threshold parameter when compacting - this was 
hard coded to 32
 +      before, but now it is possible to do bigger compactions when compacting 
from L0 to L1.
 +      This also applies to STCS-compactions in L0 - if there are more than 32 
sstables in L0
 +      we will compact at most max_threshold sstables in an L0 STCS 
compaction. See CASSANDRA-14388
 +      for more information.
 +    - There is now an option to automatically upgrade sstables after 
Cassandra upgrade, enable
 +      either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during 
runtime. See
 +      CASSANDRA-14197.
 +    - `nodetool refresh` has been deprecated in favour of `nodetool import` - 
see CASSANDRA-6719
 +      for details
 +    - An experimental option to compare all merkle trees together has been 
added - for example, in
 +      a 3 node cluster with 2 replicas identical and 1 out-of-date, with this 
option enabled, the
 +      out-of-date replica will only stream a single copy from up-to-date 
replica. Enable it by adding
 +      "-os" to nodetool repair. See CASSANDRA-3200.
 +    - The currentTimestamp, currentDate, currentTime and currentTimeUUID 
functions have been added.
 +      See CASSANDRA-13132
 +    - Support for arithmetic operations between `timestamp`/`date` and 
`duration` has been added.
 +      See CASSANDRA-11936
 +    - Support for arithmetic operations on number has been added. See 
CASSANDRA-11935
 +    - Preview expected streaming required for a repair (nodetool repair 
--preview), and validate the
 +      consistency of repaired data between nodes (nodetool repair 
--validate). See CASSANDRA-13257
 +    - Support for selecting Map values and Set elements has been added for 
SELECT queries. See CASSANDRA-7396
 +    - Change-Data-Capture has been modified to make CommitLogSegments 
available
 +      immediately upon creation via hard-linking the files. This means that 
incomplete
 +      segments will be available in cdc_raw rather than fully flushed. See 
documentation
 +      and CASSANDRA-12148 for more detail.
 +    - The initial build of materialized views can be parallelized. The number 
of concurrent builder
 +      threads is specified by the property 
`cassandra.yaml:concurrent_materialized_view_builders`.
 +      This property can be modified at runtime through both JMX and the new 
`setconcurrentviewbuilders`
 +      and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 
for more details.
 +    - There is now a binary full query log based on Chronicle Queue that can 
be controlled using
 +      nodetool enablefullquerylog, disablefullquerylog, and 
resetfullquerylog. The log
 +      contains all queries invoked, approximate time they were invoked, any 
parameters necessary
 +      to bind wildcard values, and all query options. A human readable 
version of the log can be
 +      dumped or tailed using the new bin/fqltool utility. The full query log 
is designed to be safe
 +      to use in production and limits utilization of heap memory and disk 
space with limits
 +      you can specify when enabling the log.
 +      See nodetool and fqltool help text for more information.
 +    - SSTableDump now supports the -l option to output each partition as it's 
own json object
 +      See CASSANDRA-13848 for more detail
 +    - Metric for coordinator writes per table has been added. See 
CASSANDRA-14232
 +    - Nodetool cfstats now has options to sort by various metrics as well as 
limit results.
 +    - Operators can restrict login user activity to one or more datacenters. 
See `network_authorizer`
 +      in cassandra.yaml, and the docs for create and alter role statements. 
CASSANDRA-13985
 +    - Roles altered from login=true to login=false will prevent existing 
connections from executing any
 +      statements after the cache has been refreshed. CASSANDRA-13985
 +    - Support for audit logging of database activity. If enabled, logs every 
incoming
 +      CQL command request, Authentication (successful as well as unsuccessful 
login) to a node.
 +    - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, 
Cassandra will use stream
 +      entire SSTables, significantly speeding up transfers. Any streaming 
related operations will see
 +      corresponding improvement. See CASSANDRA-14556.
 +    - NetworkTopologyStrategy now supports auto-expanding the 
replication_factor
 +      option into all available datacenters at CREATE or ALTER time. For 
example,
 +      specifying replication_factor: 3 translates to three replicas in every
 +      datacenter. This auto-expansion will _only add_ datacenters for safety.
 +      See CASSANDRA-14303 for more details.
 +    - Added Python 3 support so cqlsh and cqlshlib is now compatible with 
Python 2.7 and Python 3.6.
 +      Added --python option to cqlsh so users can specify the path to their 
chosen Python interpreter.
 +      See CASSANDRA-10190 for details.
 +    - Support for server side DESCRIBE statements has been added. See 
CASSANDRA-14825
 +    - It is now possible to rate limit snapshot creation/clearing. See 
CASSANDRA-13019
 +
 +Upgrading
 +---------
 +    - Cassandra removed support for the OldNetworkTopologyStrategy. Before 
upgrading you will need to change the 
 +      replication strategy for the keyspaces using this strategy to the 
NetworkTopologyStrategy. (CASSANDRA-13990)
 +    - Sstables for tables using with a frozen UDT written by C* 3.0 appear as 
corrupted.
 +
 +      Background: The serialization-header in the -Statistics.db sstable 
component contains the type information
 +      of the table columns. C* 3.0 write incorrect type information for 
frozen UDTs by omitting the
 +      "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 
in C* 3.6. Since then, the missing
 +      "frozen" information leads to deserialization issues that result in 
CorruptSSTableExceptions, potentially other
 +      exceptions as well.
 +
 +      As a mitigation, the sstable serialization-headers are rewritten to 
contain the missing "frozen" information for
 +      UDTs once, when an upgrade from C* 3.0 is detected. This migration does 
not touch snapshots or backups.
 +
 +      The sstablescrub tool now performs a check of the sstable 
serialization-header against the schema. A mismatch of
 +      the types in the serialization-header and the schema will cause 
sstablescrub to error out and stop by default.
 +      See the new `-e` option. `-e off` disables the new validation code. `-e 
fix` or `-e fix-only`, e.g.
 +      `sstablescrub -e fix keyspace table`, will validate the 
serialization-header, rewrite the non-frozen UDTs
 +      in the serialzation-header to frozen UDTs, if that matches the schema, 
and continue with scrub.
 +      See `sstablescrub -h`.
 +      (CASSANDRA-15035)
 +    - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd 
tables from
 +      64kb to 16kb. For highly compressible data this can have a noticeable 
impact
 +      on space utilization. You may want to consider manually specifying this 
value.
 +    - Additional columns have been added to system_distributed.repair_history,
 +      system_traces.sessions and system_traces.events. As a result select 
queries
 +      against these tables - including queries against tracing tables 
performed
 +      automatically by the drivers and cqlsh - will fail and generate an 
error in the log
 +      during upgrade when the cluster is mixed version. On 3.x side this will 
also lead
 +      to broken internode connections and lost messages.
 +      Cassandra versions 3.0.20 and 3.11.6 pre-add these columns (see 
CASSANDRA-15385),
 +      so please make sure to upgrade to those versions or higher before 
upgrading to
 +      4.0 for query tracing to not cause any issues during the upgrade to 4.0.
 +    - Timestamp ties between values resolve differently: if either value has 
a TTL,
 +      this value always wins. This is to provide consistent reconciliation 
before
 +      and after the value expires into a tombstone.
 +    - Support for legacy auth tables in the system_auth keyspace (users,
 +      permissions, credentials) and the migration code has been removed. 
Migration
 +      of these legacy auth tables must have been completed before the upgrade 
to
 +      4.0 and the legacy tables must have been removed. See the 'Upgrading' 
section
 +      for version 2.2 for migration instructions.
 +    - Cassandra 4.0 removed support for the deprecated Thrift interface. 
Amongst
 +      other things, this implies the removal of all yaml options related to 
thrift
 +      ('start_rpc', rpc_port, ...).
 +    - Cassandra 4.0 removed support for any pre-3.0 format. This means you
 +      cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade 
to
 +      a 3.0.x/3.x version first (and run upgradesstable). In particular, this
 +      mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you
 +      will need to upgrade those sstable in 3.0.x/3.x first.
 +    - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, 
previous
 +      versions will causes issues during rolling upgrades (CASSANDRA-13274).
 +    - Cassandra will no longer allow invalid keyspace replication options, 
such
 +      as invalid datacenter names for NetworkTopologyStrategy. Operators MUST
 +      add new nodes to a datacenter before they can set set ALTER or CREATE
 +      keyspace replication policies using that datacenter. Existing keyspaces
 +      will continue to operate, but CREATE and ALTER will validate that all
 +      datacenters specified exist in the cluster.
 +    - Cassandra 4.0 fixes a problem with incremental repair which caused 
repaired
 +      data to be inconsistent between nodes. The fix changes the behavior of 
both
 +      full and incremental repairs. For full repairs, data is no longer marked
 +      repaired. For incremental repairs, anticompaction is run at the 
beginning
 +      of the repair, instead of at the end. If incremental repair was being 
used
 +      prior to upgrading, a full repair should be run after upgrading to 
resolve
 +      any inconsistencies.
 +    - Config option index_interval has been removed (it was deprecated since 
2.0)
 +    - Deprecated repair JMX APIs are removed.
 +    - The version of snappy-java has been upgraded to 1.1.2.6
 +    - the miniumum value for internode message timeouts is 10ms. Previously, 
any
 +      positive value was allowed. See cassandra.yaml entries like
 +      read_request_timeout_in_ms for more details.
 +    - Cassandra 4.0 allows a single port to be used for both secure and 
insecure
 +      connections between cassandra nodes (CASSANDRA-10404). See the yaml for
 +      specific property changes, and see the security doc for full details.
 +    - Due to the parallelization of the initial build of materialized views,
 +      the per token range view building status is stored in the new table
 +      `system.view_builds_in_progress`. The old table 
`system.views_builds_in_progress`
 +      is no longer used and can be removed. See CASSANDRA-12245 for more 
details.
 +    - Config option commitlog_sync_batch_window_in_ms has been deprecated as 
it's
 +      documentation has been incorrect and the setting itself near useless.
 +      Batch mode remains a valid commit log mode, however.
 +    - There is a new commit log mode, group, which is similar to batch mode
 +      but blocks for up to a configurable number of milliseconds between disk 
flushes.
 +    - nodetool clearsnapshot now required the --all flag to remove all 
snapshots.
 +      Previous behavior would delete all snapshots by default.
 +    - Nodes are now identified by a combination of IP, and storage port.
 +      Existing JMX APIs, nodetool, and system tables continue to work
 +      and accept/return just an IP, but there is a new
 +      version of each that works with the full unambiguous identifier.
 +      You should prefer these over the deprecated ambiguous versions that only
 +      work with an IP. This was done to support multiple instances per IP.
 +      Additionally we are moving to only using a single port for encrypted and
 +      unencrypted traffic and if you want multiple instances per IP you must
 +      first switch encrypted traffic to the storage port and not a separate
 +      encrypted port. If you want to use multiple instances per IP
 +      with SSL you will need to use StartTLS on storage_port and set
 +      outgoing_encrypted_port_source to gossip outbound connections
 +      know what port to connect to for each instance. Before changing
 +      storage port or native port at nodes you must first upgrade the entire 
cluster
 +      and clients to 4.0 so they can handle the port not being consistent 
across
 +      the cluster.
 +    - Names of AWS regions/availability zones have been cleaned up to more 
correctly
 +      match the Amazon names. There is now a new option in 
conf/cassandra-rackdc.properties
 +      that lets users enable the correct names for new clusters, or use the 
legacy
 +      names for existing clusters. See conf/cassandra-rackdc.properties for 
details.
 +    - Background repair has been removed. dclocal_read_repair_chance and
 +      read_repair_chance table options have been removed and are now rejected.
 +      See CASSANDRA-13910 for details.
 +    - Internode TCP connections that do not ack segments for 30s will now
 +      be automatically detected and closed via the Linux TCP_USER_TIMEOUT
 +      socket option. This should be exceedingly rare, but AWS networks (and
 +      other stateful firewalls) apparently suffer from this issue. You can
 +      tune the timeouts on TCP connection and segment ack via the
 +      `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and
 +      `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively.
 +      See CASSANDRA-14358 for details.
 +    - repair_session_space_in_mb setting has been added to cassandra.yaml to 
allow operators to reduce
 +      merkle tree size if repair is creating too much heap pressure. The 
repair_session_max_tree_depth
 +      setting added in 3.0.19 and 3.11.5 is deprecated in favor of this 
setting. See CASSANDRA-14096
 +    - The flags 'enable_materialized_views' and 'enable_sasi_indexes' in 
cassandra.yaml
 +      have been set as false by default. Operators should modify them to 
allow the
 +      creation of new views and SASI indexes, the existing ones will continue 
working.
 +      See CASSANDRA-14866 for details.
 +    - CASSANDRA-15216 - The flag 'cross_node_timeout' has been set as true by 
default.
 +      This change is done under the assumption that users have setup NTP on
 +      their clusters or otherwise synchronize their clocks, and that clocks 
are
 +      mostly in sync, since this is a requirement for general correctness of
 +      last write wins.
 +    - CASSANDRA-15257 removed the joda time dependency.  Any time formats
 +      passed will now need to conform to java.time.format.DateTimeFormatter.
 +      Most notably, days and months must be two digits, and years exceeding
 +      four digits need to be prefixed with a plus or minus sign.
 +    - cqlsh now returns a non-zero code in case of errors. This is a backward 
incompatible change so it may
 +      break existing scripts that rely on the current behavior. See 
CASSANDRA-15623 for more details.
 +    - Updated the default compaction_throughput_mb_per_sec to to 64. The 
original
 +      default (16) was meant for spinning disk volumes.  See CASSANDRA-14902 
for details.
 +    - Custom compaction strategies must now handle getting sstables 
added/removed notifications for
 +      sstables already added/removed - see CASSANDRA-14103 for details.
 +    - Support for JNA with glibc 2.6 and earlier has been removed. Centos 5, 
Debian 4, and Ubuntu 7.10 operating systems
 +      must be first upgraded. See CASSANDRA-16212 for more.
 +    - In cassandra.yaml, when using vnodes num_tokens must be defined if 
initial_token is defined.
 +      If it is not defined, or not equal to the numbers of tokens defined in 
initial_tokens,
 +      the node will not start. See CASSANDRA-14477 for details.
 +
 +
 +Deprecation
 +-----------
 +
 +    - The JMX MBean org.apache.cassandra.db:type=BlacklistedDirectories has 
been
 +      deprecated in favor of 
org.apache.cassandra.db:type=DisallowedDirectories
 +      and will be removed in a subsequent major version.
 +
 +
 +Materialized Views
 +-------------------
 +    - Following a discussion regarding concerns about the design and safety 
of Materialized Views, the C* development
 +      community no longer recommends them for production use, and considers 
them experimental. Warnings messages will
 +      now be logged when they are created. (See 
https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html)
 +    - An 'enable_materialized_views' flag has been added to cassandra.yaml to 
allow operators to prevent creation of
 +      views
 +    - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key 
columns are no longer implicitly considered
 +      to be NOT NULL, and no base primary key columns get automatically 
included in view definition. You have to
 +      specify them explicitly now.
 +
  3.11.10
 -=====
 +======
 +
  Upgrading
  ---------
+     - This release fix a correctness issue with SERIAL reads, and LWT writes 
that do not apply.
+       Unfortunately, this fix has a performance impact on read performance at 
the SERIAL or
+       LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, 
the performance
+       impact may be noticeable and may also result in an increased of 
timeouts. For that
+       reason, a opt-in system property has been added to disable the fix:
+         -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+       Use this flag at your own risk as it revert SERIAL reads to the 
incorrect behavior of
 -      previous versions. See CASSANDRA-12126 for details.
 -    - In cassandra.yaml, when using vnodes num_tokens must be defined if 
initial_token is defined.
 -      If it is not defined, or not equal to the numbers of tokens defined in 
initial_tokens,
 -      the node will not start. See CASSANDRA-14477 for details.
++      previous versions. See CASSANDRA-12126 for details. 
      - SASI's `max_compaction_flush_memory_in_mb` setting was previously 
getting interpreted in bytes. From 3.11.8
        it is correctly interpreted in megabytes, but prior to 3.11.10 previous 
configurations of this setting will
        lead to nodes OOM during compaction. From 3.11.10 previous 
configurations will be detected as incorrect,
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index c3715dd,d6f713e..df3a6f5
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -17,40 -17,24 +17,40 @@@
   */
  package org.apache.cassandra.service;
  
 -import java.io.IOException;
 -import java.net.InetAddress;
  import java.nio.ByteBuffer;
- import java.nio.file.Paths;
 -import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
+ import java.util.function.Supplier;
  
 -import com.google.common.base.Predicate;
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import com.google.common.cache.CacheLoader;
 -import com.google.common.collect.*;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
  import com.google.common.primitives.Ints;
  import com.google.common.util.concurrent.Uninterruptibles;
 -
  import org.apache.commons.lang3.StringUtils;
 -
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -241,10 -177,18 +245,20 @@@ public class StorageProxy implements St
          for(ConsistencyLevel level : ConsistencyLevel.values())
          {
              readMetricsMap.put(level, new ClientRequestMetrics("Read-" + 
level.name()));
 -            writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + 
level.name()));
 +            writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" 
+ level.name()));
          }
  
 +        ReadRepairMetrics.init();
++
+         if (disableSerialReadLinearizability)
+         {
+             logger.warn("This node was started with -D{}. SERIAL (and 
LOCAL_SERIAL) reads coordinated by this node " +
+                         "will not offer linearizability (see CASSANDRA-12126 
for details on what this mean) with " +
+                         "respect to other SERIAL operations. Please note 
that, with this flag, SERIAL reads will be " +
+                         "slower than QUORUM reads, yet offer no more 
guarantee. This flag should only be used in " +
+                         "the restricted case of upgrading from a 
pre-CASSANDRA-12126 version, and only if you " +
+                         "understand the tradeoff.", 
DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+         }
      }
  
      /**
@@@ -295,31 -239,19 +309,20 @@@
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit,
                                    ClientState state,
 +                                  int nowInSeconds,
                                    long queryStartNanoTime)
 -    throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
 +    throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException, 
CasWriteUnknownResultException
      {
          final long startTimeForMetrics = System.nanoTime();
-         TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspaceName, cfName);
-         int contentions = 0;
          try
          {
-             consistencyForPaxos.validateForCas();
-             consistencyForCommit.validateForCasCommit(keyspaceName);
 -            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
++            TableMetadata metadata = 
Schema.instance.validateTable(keyspaceName, cfName);
  
-             long timeoutNanos = 
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
-             while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
+             Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () 
->
              {
-                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                 ReplicaPlan.ForPaxosWrite replicaPlan = 
ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
- 
-                 final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, 
consistencyForPaxos, consistencyForCommit, true, state);
-                 final UUID ballot = pair.ballot;
-                 contentions += pair.contentions;
- 
                  // read the current values and check they validate the 
conditions
                  Tracing.trace("Reading existing values for CAS precondition");
 -                SinglePartitionReadCommand readCommand = 
request.readCommand(FBUtilities.nowInSeconds());
 +                SinglePartitionReadCommand readCommand = 
(SinglePartitionReadCommand) request.readCommand(nowInSeconds);
                  ConsistencyLevel readConsistency = consistencyForPaxos == 
ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : 
ConsistencyLevel.QUORUM;
  
                  FilteredPartition current;
@@@ -332,17 -264,12 +335,16 @@@
                  {
                      Tracing.trace("CAS precondition does not match current 
values {}", current);
                      casWriteMetrics.conditionNotMet.inc();
-                     return current.rowIterator();
+                     return Pair.create(PartitionUpdate.emptyUpdate(metadata, 
key), current.rowIterator());
                  }
  
-                 // finish the paxos round w/ the desired updates
-                 // TODO turn null updates into delete?
+                 // Create the desired updates
                  PartitionUpdate updates = request.makeUpdates(current);
  
 +                long size = updates.dataSize();
 +                casWriteMetrics.mutationSize.update(size);
 +                
writeMetricsMap.get(consistencyForPaxos).mutationSize.update(size);
 +
                  // Apply triggers to cas updates. A consideration here is that
                  // triggers emit Mutations, and so a given trigger 
implementation
                  // may generate mutations for partitions other than the one 
this
@@@ -352,36 -279,21 +354,32 @@@
                  // InvalidRequestException) any which aren't.
                  updates = TriggerExecutor.instance.execute(updates);
  
+                 return Pair.create(updates, null);
+             };
  
-                 Commit proposal = Commit.newProposal(ballot, updates);
-                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
-                 if (proposePaxos(proposal, replicaPlan, true, 
queryStartNanoTime))
-                 {
-                     commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
-                     Tracing.trace("CAS successful");
-                     return null;
-                 }
- 
-                 Tracing.trace("Paxos proposal not accepted (pre-empted by a 
higher ballot)");
-                 contentions++;
-                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
MILLISECONDS);
-                 // continue to retry
-             }
+             return doPaxos(metadata,
+                            key,
+                            consistencyForPaxos,
+                            consistencyForCommit,
+                            consistencyForCommit,
+                            state,
+                            queryStartNanoTime,
+                            casWriteMetrics,
+                            updateProposer);
  
-             throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
          }
 -        catch (WriteTimeoutException | ReadTimeoutException e)
 +        catch (CasWriteUnknownResultException e)
 +        {
 +            casWriteMetrics.unknownResult.mark();
 +            throw e;
 +        }
-         catch (WriteTimeoutException wte)
++        catch (CasWriteTimeoutException wte)
 +        {
 +            casWriteMetrics.timeouts.mark();
 +            writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
-             throw new CasWriteTimeoutException(wte.writeType, 
wte.consistency, wte.received, wte.blockFor, contentions);
++            throw new CasWriteTimeoutException(wte.writeType, 
wte.consistency, wte.received, wte.blockFor, wte.contentions);
 +        }
 +        catch (ReadTimeoutException e)
          {
              casWriteMetrics.timeouts.mark();
              writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
@@@ -409,10 -319,154 +405,131 @@@
          }
      }
  
-     private static void recordCasContention(int contentions)
 -    private static void recordCasContention(CASClientRequestMetrics 
casMetrics, int contentions)
++    private static void recordCasContention(TableMetadata table,
++                                            DecoratedKey key,
++                                            CASClientRequestMetrics 
casMetrics,
++                                            int contentions)
      {
--        if(contentions > 0)
-             casWriteMetrics.contention.update(contentions);
 -            casMetrics.contention.update(contentions);
++        if (contentions == 0)
++            return;
++
++        casMetrics.contention.update(contentions);
++        Keyspace.open(table.keyspace)
++                .getColumnFamilyStore(table.name)
++                .metric
++                .topCasPartitionContention
++                .addSample(key.getKey(), contentions);
+     }
+ 
+     /**
+      * Performs the Paxos rounds for a given proposal, retrying when 
preempted until the timeout.
+      *
+      * <p>The main 'configurable' of this method is the {@code 
createUpdateProposal} method: it is called by the method
+      * once a ballot has been successfully 'prepared' to generate the update 
to 'propose' (and commit if the proposal is
+      * successful). That method also generates the result that the whole 
method will return. Note that due to retrying,
+      * this method may be called multiple times and does not have to return 
the same results.
+      *
+      * @param metadata the table to update with Paxos.
+      * @param key the partition updated.
+      * @param consistencyForPaxos the serial consistency of the operation 
(either {@link ConsistencyLevel#SERIAL} or
+      *     {@link ConsistencyLevel#LOCAL_SERIAL}).
+      * @param consistencyForReplayCommits the consistency for the commit 
phase of "replayed" in-progress operations.
+      * @param consistencyForCommit the consistency for the commit phase of 
_this_ operation update.
+      * @param state the client state.
+      * @param queryStartNanoTime the nano time for the start of the query 
this is part of. This is the base time for
+      *     timeouts.
+      * @param casMetrics the metrics to update for this operation.
+      * @param createUpdateProposal method called after a successful 'prepare' 
phase to obtain 1) the actual update of
+      *     this operation and 2) the result that the whole method should 
return. This can return {@code null} in the
+      *     special where, after having "prepared" (and thus potentially 
replayed in-progress upgdates), we don't want
+      *     to propose anything (the whole method then return {@code null}).
+      * @return the second element of the pair returned by {@code 
createUpdateProposal} (for the last call of that method
+      *     if that method is called multiple times due to retries).
+      */
 -    private static RowIterator doPaxos(CFMetaData metadata,
++    private static RowIterator doPaxos(TableMetadata metadata,
+                                        DecoratedKey key,
+                                        ConsistencyLevel consistencyForPaxos,
+                                        ConsistencyLevel 
consistencyForReplayCommits,
+                                        ConsistencyLevel consistencyForCommit,
+                                        ClientState state,
+                                        long queryStartNanoTime,
+                                        CASClientRequestMetrics casMetrics,
+                                        Supplier<Pair<PartitionUpdate, 
RowIterator>> createUpdateProposal)
+     throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
+     {
+         int contentions = 0;
++        Keyspace keyspace = Keyspace.open(metadata.keyspace);
+         try
+         {
+             consistencyForPaxos.validateForCas();
 -            consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
 -            consistencyForCommit.validateForCasCommit(metadata.ksName);
++            
consistencyForReplayCommits.validateForCasCommit(metadata.keyspace);
++            consistencyForCommit.validateForCasCommit(metadata.keyspace);
+ 
 -            long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 -            while (System.nanoTime() - queryStartNanoTime < timeout)
++            long timeoutNanos = 
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
++            while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
+             {
+                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
 -                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
 -                List<InetAddress> liveEndpoints = p.left;
 -                int requiredParticipants = p.right;
 -
 -                final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(queryStartNanoTime,
 -                                                                     key,
 -                                                                     metadata,
 -                                                                     
liveEndpoints,
 -                                                                     
requiredParticipants,
 -                                                                     
consistencyForPaxos,
 -                                                                     
consistencyForReplayCommits,
 -                                                                     
casMetrics,
 -                                                                     state);
 -                final UUID ballot = pair.left;
 -                contentions += pair.right;
++                ReplicaPlan.ForPaxosWrite replicaPlan = 
ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos);
++                PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime,
++                                                                    key,
++                                                                    metadata,
++                                                                    
replicaPlan,
++                                                                    
consistencyForPaxos,
++                                                                    
consistencyForReplayCommits,
++                                                                    
casMetrics,
++                                                                    state);
++
++                final UUID ballot = pair.ballot;
++                contentions += pair.contentions;
+ 
+                 Pair<PartitionUpdate, RowIterator> proposalPair = 
createUpdateProposal.get();
+                 // See method javadoc: null here is code for "stop here and 
return null".
+                 if (proposalPair == null)
+                     return null;
+ 
+                 Commit proposal = Commit.newProposal(ballot, 
proposalPair.left);
+                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
 -                if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
++                if (proposePaxos(proposal, replicaPlan, true, 
queryStartNanoTime))
+                 {
+                     // We skip committing accepted updates when they are 
empty. This is an optimization which works
+                     // because we also skip replaying those same empty update 
in beginAndRepairPaxos (see the longer
+                     // comment there). As empty update are somewhat common 
(serial reads and non-applying CAS propose
+                     // them), this is worth bothering.
+                     if (!proposal.update.isEmpty())
+                         commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
+                     RowIterator result = proposalPair.right;
+                     if (result != null)
+                         Tracing.trace("CAS did not apply");
+                     else
+                         Tracing.trace("CAS applied successfully");
+                     return result;
+                 }
+ 
+                 Tracing.trace("Paxos proposal not accepted (pre-empted by a 
higher ballot)");
+                 contentions++;
+                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
+                 // continue to retry
+             }
 -
 -            throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+         }
 -        finally
++        catch (CasWriteTimeoutException e)
+         {
 -            if(contentions > 0)
 -                casMetrics.contention.update(contentions);
++            // Might be thrown by beginRepairAndPaxos. In that case, any 
contention that happened within the method and
++            // led up to the timeout was not accounted in our local 
'contentions' variable and we add it now so it the
++            // contention recorded in the finally is correct.
++            contentions += e.contentions;
++            throw e;
+         }
 -    }
 -
 -    private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
 -    {
 -        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 -        return new Predicate<InetAddress>()
++        catch (WriteTimeoutException e)
+         {
 -            public boolean apply(InetAddress host)
 -            {
 -                return dc.equals(snitch.getDatacenter(host));
 -            }
 -        };
 -    }
 -
 -    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
 -    {
 -        Token tk = key.getToken();
 -        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(cfm.ksName, tk);
 -        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName);
 -        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
++            // Might be thrown by proposePaxos or commitPaxos
++            throw new CasWriteTimeoutException(e.writeType, e.consistency, 
e.received, e.blockFor, contentions);
++        }
++        finally
+         {
 -            // Restrict naturalEndpoints and pendingEndpoints to node in the 
local DC only
 -            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 -            Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc);
 -            naturalEndpoints = 
ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
 -            pendingEndpoints = 
ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
++            recordCasContention(metadata, key, casMetrics, contentions);
+         }
 -        int participants = pendingEndpoints.size() + naturalEndpoints.size();
 -        int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
 -        List<InetAddress> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
 -        if (liveEndpoints.size() < requiredParticipants)
 -            throw new UnavailableException(consistencyForPaxos, 
requiredParticipants, liveEndpoints.size());
 -
 -        // We cannot allow CAS operations with 2 or more pending endpoints, 
see #8346.
 -        // Note that we fake an impossible number of required nodes in the 
unavailable exception
 -        // to nail home the point that it's an impossible operation no matter 
how many nodes are live.
 -        if (pendingEndpoints.size() > 1)
 -            throw new UnavailableException(String.format("Cannot perform LWT 
operation as there is more than one (%d) pending range movement", 
pendingEndpoints.size()),
 -                                           consistencyForPaxos,
 -                                           participants + 1,
 -                                           liveEndpoints.size());
+ 
 -        return Pair.create(liveEndpoints, requiredParticipants);
++        throw new CasWriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, consistencyForPaxos.blockFor(keyspace), contentions);
      }
  
      /**
@@@ -421,17 -475,18 +538,17 @@@
       * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
       * nodes have seen the mostRecentCommit.  Otherwise, return null.
       */
 -    private static Pair<UUID, Integer> beginAndRepairPaxos(long 
queryStartNanoTime,
 -                                                           DecoratedKey key,
 -                                                           CFMetaData 
metadata,
 -                                                           List<InetAddress> 
liveEndpoints,
 -                                                           int 
requiredParticipants,
 -                                                           ConsistencyLevel 
consistencyForPaxos,
 -                                                           ConsistencyLevel 
consistencyForCommit,
 -                                                           
CASClientRequestMetrics casMetrics,
 -                                                           ClientState state)
 +    private static PaxosBallotAndContention beginAndRepairPaxos(long 
queryStartNanoTime,
 +                                                                DecoratedKey 
key,
 +                                                                TableMetadata 
metadata,
 +                                                                
ReplicaPlan.ForPaxosWrite paxosPlan,
 +                                                                
ConsistencyLevel consistencyForPaxos,
 +                                                                
ConsistencyLevel consistencyForCommit,
-                                                                 final boolean 
isWrite,
++                                                                
CASClientRequestMetrics casMetrics,
 +                                                                ClientState 
state)
      throws WriteTimeoutException, WriteFailureException
      {
 -        long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 +        long timeoutNanos = 
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
  
          PrepareCallback summary = null;
          int contentions = 0;
@@@ -448,76 -503,89 +565,87 @@@
              UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(ballotMicros);
  
              // prepare
--            Tracing.trace("Preparing {}", ballot);
--            Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-             summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
 -            summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants, consistencyForPaxos, queryStartNanoTime);
--            if (!summary.promised)
++            try
              {
--                Tracing.trace("Some replicas have already promised a higher 
ballot than ours; aborting");
--                contentions++;
--                // sleep a random amount to give the other proposer a chance 
to finish
-                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
MILLISECONDS);
 -                
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
--                continue;
--            }
- 
-             Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
-             Commit mostRecent = summary.mostRecentCommit;
++                Tracing.trace("Preparing {}", ballot);
++                Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
++                summary = preparePaxos(toPrepare, paxosPlan, 
queryStartNanoTime);
++                if (!summary.promised)
++                {
++                    Tracing.trace("Some replicas have already promised a 
higher ballot than ours; aborting");
++                    contentions++;
++                    // sleep a random amount to give the other proposer a 
chance to finish
++                    
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
MILLISECONDS);
++                    continue;
++                }
  
 -            Commit inProgress = summary.mostRecentInProgressCommit;
 -            Commit mostRecent = summary.mostRecentCommit;
 -
--            // If we have an in-progress ballot greater than the MRC we know, 
then it's an in-progress round that
--            // needs to be completed, so do it.
 -            // One special case we make is for update that are empty (which 
are proposed by serial reads and
 -            // non-applying CAS). While we could handle those as any other 
updates, we can optimize this somewhat by
 -            // neither committing those empty updates, nor replaying 
in-progress ones. The reasoning is this: as the
 -            // update is empty, we have nothing to apply to storage in the 
commit phase, so the only reason to commit
 -            // would be to update the MRC. However, if we skip replaying 
those empty updates, then we don't need to
 -            // update the MRC for following updates to make progress (that 
is, if we didn't had the empty update skip
 -            // below _but_ skipped updating the MRC on empty updates, then 
we'd be stuck always proposing that same
 -            // empty update). And the reason skipping that replay is safe is 
that when an operation tries to propose
 -            // an empty value, there can be only 2 cases:
 -            //  1) the propose succeed, meaning a quorum of nodes accept it, 
in which case we are guaranteed no earlier
 -            //     pending operation can ever be replayed (which is what we 
want to guarantee with the empty update).
 -            //  2) the propose does not succeed. But then the operation 
proposing the empty update will not succeed
 -            //     either (it will retry or ultimately timeout), and we're 
actually ok if earlier pending operation gets
 -            //     replayed in that case.
 -            // Tl;dr, it is safe to skip committing empty updates _as long 
as_ we also skip replying them below. And
 -            // doing is more efficient, so we do so.
--            if (!inProgress.update.isEmpty() && 
inProgress.isAfter(mostRecent))
--            {
--                Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
-                 if(isWrite)
-                     casWriteMetrics.unfinishedCommit.inc();
-                 else
-                     casReadMetrics.unfinishedCommit.inc();
 -                casMetrics.unfinishedCommit.inc();
--                Commit refreshedInProgress = Commit.newProposal(ballot, 
inProgress.update);
-                 if (proposePaxos(refreshedInProgress, paxosPlan, false, 
queryStartNanoTime))
 -                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
++                Commit inProgress = summary.mostRecentInProgressCommit;
++                Commit mostRecent = summary.mostRecentCommit;
++
++                // If we have an in-progress ballot greater than the MRC we 
know, then it's an in-progress round that
++                // needs to be completed, so do it.
++                // One special case we make is for update that are empty 
(which are proposed by serial reads and
++                // non-applying CAS). While we could handle those as any 
other updates, we can optimize this somewhat by
++                // neither committing those empty updates, nor replaying 
in-progress ones. The reasoning is this: as the
++                // update is empty, we have nothing to apply to storage in 
the commit phase, so the only reason to commit
++                // would be to update the MRC. However, if we skip replaying 
those empty updates, then we don't need to
++                // update the MRC for following updates to make progress 
(that is, if we didn't had the empty update skip
++                // below _but_ skipped updating the MRC on empty updates, 
then we'd be stuck always proposing that same
++                // empty update). And the reason skipping that replay is safe 
is that when an operation tries to propose
++                // an empty value, there can be only 2 cases:
++                //  1) the propose succeed, meaning a quorum of nodes accept 
it, in which case we are guaranteed no earlier
++                //     pending operation can ever be replayed (which is what 
we want to guarantee with the empty update).
++                //  2) the propose does not succeed. But then the operation 
proposing the empty update will not succeed
++                //     either (it will retry or ultimately timeout), and 
we're actually ok if earlier pending operation gets
++                //     replayed in that case.
++                // Tl;dr, it is safe to skip committing empty updates _as 
long as_ we also skip replying them below. And
++                // doing is more efficient, so we do so.
++                if (!inProgress.update.isEmpty() && 
inProgress.isAfter(mostRecent))
                  {
--                    try
++                    Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
++                    casMetrics.unfinishedCommit.inc();
++                    Commit refreshedInProgress = Commit.newProposal(ballot, 
inProgress.update);
++                    if (proposePaxos(refreshedInProgress, paxosPlan, false, 
queryStartNanoTime))
                      {
                          commitPaxos(refreshedInProgress, 
consistencyForCommit, false, queryStartNanoTime);
                      }
--                    catch (WriteTimeoutException e)
++                    else
                      {
-                         recordCasContention(contentions);
 -                        recordCasContention(casMetrics, contentions);
--                        // We're still doing preparation for the paxos 
rounds, so we want to use the CAS (see CASSANDRA-8672)
--                        throw new WriteTimeoutException(WriteType.CAS, 
e.consistency, e.received, e.blockFor);
++                        Tracing.trace("Some replicas have already promised a 
higher ballot than ours; aborting");
++                        // sleep a random amount to give the other proposer a 
chance to finish
++                        contentions++;
++                        
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
MILLISECONDS);
                      }
++                    continue;
                  }
--                else
++
++                // To be able to propose our value on a new round, we need a 
quorum of replica to have learn the previous one. Why is explained at:
++                // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
++                // Since we waited for quorum nodes, if some of them haven't 
seen the last commit (which may just be a timing issue, but may also
++                // mean we lost messages), we pro-actively "repair" those 
nodes, and retry.
++                int nowInSec = 
Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
++                Iterable<InetAddressAndPort> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, nowInSec);
++                if (Iterables.size(missingMRC) > 0)
                  {
--                    Tracing.trace("Some replicas have already promised a 
higher ballot than ours; aborting");
--                    // sleep a random amount to give the other proposer a 
chance to finish
--                    contentions++;
-                     
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
MILLISECONDS);
 -                    
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
++                    Tracing.trace("Repairing replicas that missed the most 
recent commit");
++                    sendCommit(mostRecent, missingMRC);
++                    // TODO: provided commits don't invalid the prepare we 
just did above (which they don't), we could just wait
++                    // for all the missingMRC to acknowledge this commit and 
then move on with proposing our value. But that means
++                    // adding the ability to have commitPaxos block, which is 
exactly CASSANDRA-5442 will do. So once we have that
++                    // latter ticket, we can pass CL.ALL to the commit above 
and remove the 'continue'.
++                    continue;
                  }
--                continue;
--            }
  
--            // To be able to propose our value on a new round, we need a 
quorum of replica to have learn the previous one. Why is explained at:
--            // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
--            // Since we waited for quorum nodes, if some of them haven't seen 
the last commit (which may just be a timing issue, but may also
--            // mean we lost messages), we pro-actively "repair" those nodes, 
and retry.
--            int nowInSec = 
Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
-             Iterable<InetAddressAndPort> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, nowInSec);
 -            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, nowInSec);
--            if (Iterables.size(missingMRC) > 0)
++                return new PaxosBallotAndContention(ballot, contentions);
++            }
++            catch (WriteTimeoutException e)
              {
--                Tracing.trace("Repairing replicas that missed the most recent 
commit");
--                sendCommit(mostRecent, missingMRC);
--                // TODO: provided commits don't invalid the prepare we just 
did above (which they don't), we could just wait
--                // for all the missingMRC to acknowledge this commit and then 
move on with proposing our value. But that means
--                // adding the ability to have commitPaxos block, which is 
exactly CASSANDRA-5442 will do. So once we have that
--                // latter ticket, we can pass CL.ALL to the commit above and 
remove the 'continue'.
--                continue;
++                // We're still doing preparation for the paxos rounds, so we 
want to use the CAS (see CASSANDRA-8672)
++                throw new CasWriteTimeoutException(WriteType.CAS, 
e.consistency, e.received, e.blockFor, contentions);
              }
--
-             return new PaxosBallotAndContention(ballot, contentions);
 -            return Pair.create(ballot, contentions);
          }
  
-         recordCasContention(contentions);
-         throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)));
 -        recordCasContention(casMetrics, contentions);
 -        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
++        throw new CasWriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)), contentions);
      }
  
      /**
@@@ -1666,7 -1755,7 +1806,7 @@@
                  throw new ReadFailureException(consistencyLevel, e.received, 
e.blockFor, false, e.failureReasonByEndpoint);
              }
  
-             result = fetchRows(group.queries, consistencyForCommitOrFetch, 
queryStartNanoTime);
 -            result = fetchRows(group.commands, 
consistencyForReplayCommitsOrFetch, queryStartNanoTime);
++            result = fetchRows(group.queries, 
consistencyForReplayCommitsOrFetch, queryStartNanoTime);
          }
          catch (UnavailableException e)
          {
diff --cc src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 26890a9,ea069f7..93941e9
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@@ -46,27 -45,24 +46,24 @@@ public class PrepareCallback extends Ab
      public boolean promised = true;
      public Commit mostRecentCommit;
      public Commit mostRecentInProgressCommit;
-     public Commit mostRecentInProgressCommitWithUpdate;
  
 -    private final Map<InetAddress, Commit> commitsByReplica = new 
ConcurrentHashMap<InetAddress, Commit>();
 +    private final Map<InetAddressAndPort, Commit> commitsByReplica = new 
ConcurrentHashMap<>();
  
 -    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int 
targets, ConsistencyLevel consistency, long queryStartNanoTime)
 +    public PrepareCallback(DecoratedKey key, TableMetadata metadata, int 
targets, ConsistencyLevel consistency, long queryStartNanoTime)
      {
          super(targets, consistency, queryStartNanoTime);
 -        // need to inject the right key in the empty commit so comparing with 
empty commits in the reply works as expected
 +        // need to inject the right key in the empty commit so comparing with 
empty commits in the response works as expected
          mostRecentCommit = Commit.emptyCommit(key, metadata);
          mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
-         mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, 
metadata);
      }
  
 -    public synchronized void response(MessageIn<PrepareResponse> message)
 +    public synchronized void onResponse(Message<PrepareResponse> message)
      {
          PrepareResponse response = message.payload;
 -        logger.trace("Prepare response {} from {}", response, message.from);
 +        logger.trace("Prepare response {} from {}", response, message.from());
  
-         // In case of clock skew, another node could be proposing with ballot 
that are quite a bit
-         // older than our own. In that case, we record the more recent commit 
we've received to make
-         // sure we re-prepare on an older ballot.
+         // We set the mostRecentInProgressCommit even if we're not promised 
as, in that case, the ballot of that commit
+         // will be used to avoid generating a ballot that has not chance to 
win on retry (think clock skew).
          if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
              mostRecentInProgressCommit = response.inProgressCommit;
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 546f318,dc8c604..2a71ec2
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -59,14 -59,14 +59,16 @@@ import org.apache.cassandra.cql3.QueryP
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.ReadResponse;
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.SystemKeyspaceMigrator40;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.monitoring.ApproximateTime;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.Constants;
 +import org.apache.cassandra.distributed.action.GossipHelper;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.ICoordinator;
  import org.apache.cassandra.distributed.api.IInstance;
@@@ -79,10 -79,12 +81,12 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.SimpleQueryResult;
  import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
  import 
org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
 -import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.gms.VersionedValue;
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
  import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.DataInputBuffer;
@@@ -538,6 -602,88 +542,85 @@@ public class Instance extends IsolatedE
          return YamlConfigurationLoader.fromMap(params, check, Config.class);
      }
  
+     public static void addToRing(boolean bootstrapping, IInstance peer)
+     {
+         try
+         {
+             IInstanceConfig config = peer.config();
+             IPartitioner partitioner = 
FBUtilities.newPartitioner(config.getString("partitioner"));
+             Token token = 
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
 -            InetAddress address = config.broadcastAddress().getAddress();
++            InetAddressAndPort addressAndPort = 
toCassandraInetAddressAndPort(peer.broadcastAddress());
+ 
+             UUID hostId = config.hostId();
+             Gossiper.runInGossipStageBlocking(() -> {
 -                Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
 -                Gossiper.instance.injectApplicationState(address,
 -                        ApplicationState.TOKENS,
 -                        new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
 -                StorageService.instance.onChange(address,
 -                        ApplicationState.STATUS,
 -                        bootstrapping
 -                                ? new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
 -                                : new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
 -                Gossiper.instance.realMarkAlive(address, 
Gossiper.instance.getEndpointStateForEndpoint(address));
++                Gossiper.instance.initializeNodeUnsafe(addressAndPort, 
hostId, 1);
++                Gossiper.instance.injectApplicationState(addressAndPort,
++                                                         
ApplicationState.TOKENS,
++                                                         new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
++                StorageService.instance.onChange(addressAndPort,
++                                                 ApplicationState.STATUS,
++                                                 bootstrapping
++                                                 ? new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
++                                                 : new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
++                Gossiper.instance.realMarkAlive(addressAndPort, 
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
+             });
+             int messagingVersion = peer.isShutdown()
+                     ? MessagingService.current_version
+                     : Math.min(MessagingService.current_version, 
peer.getMessagingVersion());
 -            MessagingService.instance().setVersion(address, messagingVersion);
++            MessagingService.instance().versions.set(addressAndPort, 
messagingVersion);
+ 
 -            if (!bootstrapping)
 -                assert 
StorageService.instance.getTokenMetadata().isMember(address);
++            assert bootstrapping || 
StorageService.instance.getTokenMetadata().isMember(addressAndPort);
+             PendingRangeCalculatorService.instance.blockUntilFinished();
+         }
+         catch (Throwable e) // UnknownHostException
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static void removeFromRing(IInstance peer)
+     {
+         try
+         {
+             IInstanceConfig config = peer.config();
+             IPartitioner partitioner = 
FBUtilities.newPartitioner(config.getString("partitioner"));
+             Token token = 
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
 -            InetAddress address = config.broadcastAddress().getAddress();
++            InetAddressAndPort addressAndPort = 
toCassandraInetAddressAndPort(peer.broadcastAddress());
+ 
+             Gossiper.runInGossipStageBlocking(() -> {
 -                StorageService.instance.onChange(address,
++                StorageService.instance.onChange(addressAndPort,
+                         ApplicationState.STATUS,
+                         new 
VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token),
 0L));
 -                Gossiper.instance.removeEndpoint(address);
++                Gossiper.instance.removeEndpoint(addressAndPort);
+             });
+             PendingRangeCalculatorService.instance.blockUntilFinished();
+         }
+         catch (Throwable e) // UnknownHostException
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static void addToRingNormal(IInstance peer)
+     {
+         addToRing(false, peer);
 -        assert 
StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress());
++        assert 
StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
+     }
+ 
+     public static void addToRingBootstrapping(IInstance peer)
+     {
+         addToRing(true, peer);
+     }
+ 
+     private static void initializeRing(ICluster cluster)
+     {
+         for (int i = 1 ; i <= cluster.size() ; ++i)
+             addToRing(false, cluster.get(i));
+ 
+         for (int i = 1; i <= cluster.size(); ++i)
 -            assert 
StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress());
 -
 -        StorageService.instance.setNormalModeUnsafe();
++            assert 
StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(cluster.get(i).broadcastAddress()));
+     }
+ 
      public Future<Void> shutdown()
      {
          return shutdown(true);
diff --cc test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 0000000,473f56c..4cefbf0
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@@ -1,0 -1,684 +1,688 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.UUID;
+ import java.util.function.BiConsumer;
+ 
++
+ import org.junit.Assert;
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
 -import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
 -import static org.apache.cassandra.net.MessagingService.Verb.READ;
++import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
++import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
++import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
++import static org.apache.cassandra.net.Verb.READ_REQ;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class CASTest extends TestBaseImpl
+ {
++    private static final Logger logger = 
LoggerFactory.getLogger(CASTest.class);
++
+     @Test
+     public void simpleUpdate() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompletePrepare() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop = 
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop = 
cluster.filters().verbs(PAXOS_PREPARE_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered 
contentions: 0", e.getMessage());
+             }
+             drop.off();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+         }
+     }
+ 
+     @Test
+     public void incompletePropose() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered 
contentions: 0", e.getMessage());
+             }
+             drop1.off();
+             // make sure we encounter one of the in-progress proposals so we 
complete it
 -            
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PREPARE_REQ.id).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompleteCommit() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered 
contentions: 0", e.getMessage());
+             }
+             drop1.off();
+             // make sure we see one of the successful commits
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     private int[] paxosAndReadVerbs() {
 -        return new int[] {
 -            MessagingService.Verb.PAXOS_PREPARE.ordinal(),
 -            MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
 -            MessagingService.Verb.PAXOS_COMMIT.ordinal(),
 -            MessagingService.Verb.READ.ordinal()
 -        };
++        return new int[] { PAXOS_PREPARE_REQ.id, PAXOS_PROPOSE_REQ.id, 
PAXOS_COMMIT_REQ.id, READ_REQ.id };
+     }
+ 
+     /**
+      * Base test to ensure that if a write times out but with a proposal 
accepted by some nodes (less then quorum), and
+      * a following SERIAL operation does not observe that write (the node 
having accepted it do not participate in that
+      * following operation), then that write is never applied, even when the 
nodes having accepted the original proposal
+      * participate.
+      *
+      * <p>In other words, if an operation timeout, it may or may not be 
applied, but that "fate" is persistently decided
+      * by the very SERIAL operation that "succeed" (in the sense of 'not 
timing out or throwing some other exception').
+      *
+      * @param postTimeoutOperation1 a SERIAL operation executed after an 
initial write that inserts the row [0, 0] times
+      *                              out. It is executed with a QUORUM of 
nodes that have _not_ see the timed out
+      *                              proposal, and so that operation should 
expect that the [0, 0] write has not taken
+      *                              place.
+      * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ 
{@code postTimeoutOperation1}, with no
+      *                              write executed between the 2 operation. 
Contrarily to the 1st operation, the QORUM
+      *                              for this operation _will_ include the 
node that got the proposal for the [0, 0]
+      *                              insert but didn't participated to {@code 
postTimeoutOperation1}}. That operation
+      *                              should also no witness that [0, 0] write 
(since {@code postTimeoutOperation1}
+      *                              didn't).
+      * @param loseCommitOfOperation1 if {@code true}, the test will also drop 
the "commits" messages for
+      *                               {@code postTimeoutOperation1}. In 
general, the test should behave the same with or
+      *                               without that flag since a value is 
decided as soon as it has been "accepted by
+      *                               quorum" and the commits should always be 
properly replayed.
+      */
+     private void consistencyAfterWriteTimeoutTest(BiConsumer<String, 
ICoordinator> postTimeoutOperation1,
+                                                   BiConsumer<String, 
ICoordinator> postTimeoutOperation2,
+                                                   boolean 
loseCommitOfOperation1) throws IOException
+     {
+         // It's unclear why (haven't dug), but in some of the instance of 
this test method, there is a consistent 2+
+         // seconds pauses between the prepare and propose phases during the 
execution of 'postTimeoutOperation2'. This
+         // does not happen on 3.0 and there is no report of such long pauses 
otherwise, so an hypothesis is that this
+         // is due to the in-jvm dtest framework. This is is why we use a 4 
seconds timeout here. Given this test is
+         // not about performance, this is probably ok, even if we ideally 
should dug into the underlying reason.
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 4000L)
+                                                                       
.set("cas_contention_timeout_in_ms", 200L))))
+         {
+             String table = KEYSPACE + ".t";
+             cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY 
KEY, v int)");
+ 
+             // We do a CAS insertion, but have with the PROPOSE message 
dropped on node 1 and 2. The CAS will not get
+             // through and should timeout. Importantly, node 3 does receive 
and answer the PROPOSE.
+             IMessageFilters.Filter dropProposeFilter = cluster.filters()
+                                                               .inbound()
 -                                                              
.verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
++                                                              
.verbs(PAXOS_PROPOSE_REQ.id)
++                                                              .from(3)
+                                                               .to(1, 2)
+                                                               .drop();
+             try
+             {
+                 // NOTE: the consistency below is the "commit" one, so it 
doesn't matter at all here.
 -                cluster.coordinator(1)
++                // NOTE 2: we use node 3 as coordinator because message 
filters don't currently work for locally
++                //   delivered messages and as we want to drop messages to 1 
and 2, we can't use them.
++                cluster.coordinator(3)
+                        .execute("INSERT INTO " + table + "(k, v) VALUES (0, 
0) IF NOT EXISTS", ConsistencyLevel.ONE);
+                 fail("The insertion should have timed-out");
+             }
+             catch (Exception e)
+             {
+                 // We expect a write timeout. If we get one, the test can 
continue, otherwise, we rethrow. Note that we
+                 // look at the root cause because the dtest framework 
effectively wrap the exception in a RuntimeException
+                 // (we could just look at the immediate cause, but this feel 
a bit more resilient this way).
+                 // TODO: we can't use an instanceof below because the 
WriteTimeoutException we get is from a different class
+                 //  loader than the one the test run under, and that's our 
poor-man work-around. This kind of things should
+                 //  be improved at the dtest API level.
 -                if 
(!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
++                if 
(!e.getClass().getSimpleName().equals("CasWriteTimeoutException"))
+                     throw e;
+             }
+             finally
+             {
+                 dropProposeFilter.off();
+             }
+ 
+             // Isolates node 3 and executes the SERIAL operation. As neither 
node 1 or 2 got the initial insert proposal,
+             // there is nothing to "replay" and the operation should assert 
the table is still empty.
+             IMessageFilters.Filter ignoreNode3Filter = 
cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+             IMessageFilters.Filter dropCommitFilter = null;
+             if (loseCommitOfOperation1)
+             {
 -                dropCommitFilter = 
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
++                dropCommitFilter = 
cluster.filters().verbs(PAXOS_COMMIT_REQ.id).to(1, 2).drop();
+             }
+             try
+             {
+                 postTimeoutOperation1.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode3Filter.off();
+                 if (dropCommitFilter != null)
+                     dropCommitFilter.off();
+             }
+ 
+             // Node 3 is now back and we isolate node 2 to ensure the next 
read hits node 1 and 3.
+             // What we want to ensure is that despite node 3 having the 
initial insert in its paxos state in a position of
+             // being replayed, that insert is _not_ replayed (it would 
contradict serializability since the previous
+             // operation asserted nothing was inserted). It is this execution 
that failed before CASSANDRA-12126.
+             IMessageFilters.Filter ignoreNode2Filter = 
cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+             try
+             {
+                 postTimeoutOperation2.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode2Filter.off();
+             }
+         }
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not 
see that write, then no following reads sees
+      * it, even if some nodes still have the write in their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void readConsistencyAfterWriteTimeoutTest() throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+ 
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts, then a following CAS succeed but does 
not apply in a way that indicate the write
+      * has not applied, then no following CAS can see that initial insert , 
even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void nonApplyingCasConsistencyAfterWriteTimeout() throws 
IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the 
case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this 
doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.ANY));
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not 
see that write, then no following CAS see
+      * that initial insert, even if some nodes still have the write in their 
paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() 
throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.QUORUM));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following CAS succeed but does 
not apply in a way that indicate the write
+      * has not applied, then following serial reads do no see that write, 
even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() 
throws IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the 
case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this 
doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.ANY));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     // TODO: this shoud probably be moved into the dtest API.
+     private void assertCasNotApplied(Object[][] resultSet)
+     {
+         assertFalse("Expected a CAS resultSet (with at least application 
result) but got an empty one.",
+                     resultSet.length == 0);
+         assertFalse("Invalid empty first row in CAS resultSet.", 
resultSet[0].length == 0);
+         Object wasApplied = resultSet[0][0];
+         assertTrue("Expected 1st column of CAS resultSet to be a boolean, but 
got a " + wasApplied.getClass(),
+                    wasApplied instanceof Boolean);
+         assertFalse("Expected CAS to not be applied, but was applied.", 
(Boolean)wasApplied);
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via 
gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did 
witness the range movement
+      * Prepare, Propose and Commit A to {1, 2}
+      * Range moves to {2, 3, 4}
+      * Prepare and Propose B (=> !A) to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the 
token
+             
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} 
=> {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             for (int i = 1 ; i <= 3 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {4} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(2).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(4).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via 
gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did 
witness the range movement
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+      *  -  X: Prepare, Propose and Commit A to {3, 4}
+      *  - !X: Prepare and Propose B (=>!A) to {1, 2}
+      */
+     @Ignore
+     @Test
+     public void testConflictingWritesWithStaleRingInformation() throws 
Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the 
token
+             
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             // {4} promises, accepts and commits on !{2} => {3, 4}
+             int pk = pk(cluster, 1, 2);
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(2).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
 -            
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(4).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // {1} promises, accepts and commmits on !{3} => {1, 2}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement, not witnessed by read after 
range movement.
+      * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSucccessfulWriteDuringRangeMovementFollowedByRead() 
throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 
3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, 1));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement not witnessed by write after 
range movement
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() 
throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+ 
+             // TODO: repair and verify base table state
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are 
not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information 
sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving 
of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void 
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead()
 throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 
2, 3} => {4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(1).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(4).to(1).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(1, 2, 
3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, 
pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(3).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, null, 2));
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are 
not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information 
sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving 
of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void 
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite()
 throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 
2, 3} => {4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(1).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(4).to(1).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(1, 2, 
3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, 
pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(1).to(3).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
 -            
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, 
READ_REQ.id).from(3).to(2).drop();
++            
cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, 5, 1, null, 2));
+         }
+     }
+ 
+     private static int pk(Cluster cluster, int lb, int ub)
+     {
+         return pk(cluster.get(lb), cluster.get(ub));
+     }
+ 
+     private static int pk(IInstance lb, IInstance ub)
+     {
+         return 
pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+                 
Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+     }
+ 
+     private static int pk(Token lb, Token ub)
+     {
+         int pk = 0;
+         Token pkt;
+         while (lb.compareTo(pkt = 
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || 
ub.compareTo(pkt) < 0)
+             ++pk;
+         return pk;
+     }
+ 
+     private static void debugOwnership(Cluster cluster, int pk)
+     {
+         for (int i = 1 ; i <= cluster.size() ; ++i)
 -            System.out.println(i + ": " + 
cluster.get(i).appliesOnInstance((Integer v) -> 
StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, 
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
++            System.out.println(i + ": " + 
cluster.get(i).appliesOnInstance((Integer v) -> 
StorageService.instance.getNaturalEndpointsWithPort(KEYSPACE, 
Int32Type.instance.decompose(v)))
+                     .apply(pk));
+     }
+ 
+     private static void debugPaxosState(Cluster cluster, int pk)
+     {
 -        UUID cfid = cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
++        TableId tableId = cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.id);
+         for (int i = 1 ; i <= cluster.size() ; ++i)
 -            for (Object[] row : cluster.get(i).executeInternal("select 
in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos 
where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
++            for (Object[] row : cluster.get(i).executeInternal("select 
in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos 
where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), tableId))
+                 System.out.println(i + ": " + (row[0] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[2])));
+     }
+ 
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to