http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ed0cafc..c23eb88 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -29,7 +29,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
@@ -133,18 +132,10 @@ public class StorageProxy implements StorageProxyMBean
         HintsService.instance.registerMBean();
         HintedHandOffManager.instance.registerMBean();
 
-        standardWritePerformer = new WritePerformer()
+        standardWritePerformer = (mutation, targets, responseHandler, 
localDataCenter) ->
         {
-            public void apply(IMutation mutation,
-                              Iterable<InetAddressAndPort> targets,
-                              AbstractWriteResponseHandler<IMutation> 
responseHandler,
-                              String localDataCenter,
-                              ConsistencyLevel consistency_level)
-            throws OverloadedException
-            {
-                assert mutation instanceof Mutation;
-                sendToHintedEndpoints((Mutation) mutation, targets, 
responseHandler, localDataCenter, Stage.MUTATION);
-            }
+            assert mutation instanceof Mutation;
+            sendToHintedReplicas((Mutation) mutation, targets.selected(), 
responseHandler, localDataCenter, Stage.MUTATION);
         };
 
         /*
@@ -153,29 +144,19 @@ public class StorageProxy implements StorageProxyMBean
          * but on the latter case, the verb handler already run on the 
COUNTER_MUTATION stage, so we must not execute the
          * underlying on the stage otherwise we risk a deadlock. Hence two 
different performer.
          */
-        counterWritePerformer = new WritePerformer()
+        counterWritePerformer = (mutation, targets, responseHandler, 
localDataCenter) ->
         {
-            public void apply(IMutation mutation,
-                              Iterable<InetAddressAndPort> targets,
-                              AbstractWriteResponseHandler<IMutation> 
responseHandler,
-                              String localDataCenter,
-                              ConsistencyLevel consistencyLevel)
-            {
-                counterWriteTask(mutation, targets, responseHandler, 
localDataCenter).run();
-            }
+            EndpointsForToken selected = targets.selected().withoutSelf();
+            Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
+            counterWriteTask(mutation, selected, responseHandler, 
localDataCenter).run();
         };
 
-        counterWriteOnCoordinatorPerformer = new WritePerformer()
+        counterWriteOnCoordinatorPerformer = (mutation, targets, 
responseHandler, localDataCenter) ->
         {
-            public void apply(IMutation mutation,
-                              Iterable<InetAddressAndPort> targets,
-                              AbstractWriteResponseHandler<IMutation> 
responseHandler,
-                              String localDataCenter,
-                              ConsistencyLevel consistencyLevel)
-            {
-                StageManager.getStage(Stage.COUNTER_MUTATION)
-                            .execute(counterWriteTask(mutation, targets, 
responseHandler, localDataCenter));
-            }
+            EndpointsForToken selected = targets.selected().withoutSelf();
+            Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
+            StageManager.getStage(Stage.COUNTER_MUTATION)
+                        .execute(counterWriteTask(mutation, selected, 
responseHandler, localDataCenter));
         };
 
         for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -251,11 +232,9 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                PaxosParticipants p = getPaxosParticipants(metadata, key, 
consistencyForPaxos);
-                List<InetAddressAndPort> liveEndpoints = p.liveEndpoints;
-                int requiredParticipants = p.participants;
+                ReplicaLayout.ForPaxos replicaLayout = 
ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
 
-                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, 
requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, 
consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.ballot;
                 contentions += pair.contentions;
 
@@ -297,7 +276,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
+                if (proposePaxos(proposal, replicaLayout, true, 
queryStartNanoTime))
                 {
                     commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
                     Tracing.trace("CAS successful");
@@ -346,49 +325,6 @@ public class StorageProxy implements StorageProxyMBean
             casWriteMetrics.contention.update(contentions);
     }
 
-    private static Predicate<InetAddressAndPort> sameDCPredicateFor(final 
String dc)
-    {
-        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        return new Predicate<InetAddressAndPort>()
-        {
-            public boolean apply(InetAddressAndPort host)
-            {
-                return dc.equals(snitch.getDatacenter(host));
-            }
-        };
-    }
-
-    private static PaxosParticipants getPaxosParticipants(TableMetadata 
metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
-    {
-        Token tk = key.getToken();
-        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
-        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
-        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
-        {
-            // Restrict naturalEndpoints and pendingEndpoints to node in the 
local DC only
-            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-            Predicate<InetAddressAndPort> isLocalDc = 
sameDCPredicateFor(localDc);
-            naturalEndpoints = 
ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
-            pendingEndpoints = 
ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
-        }
-        int participants = pendingEndpoints.size() + naturalEndpoints.size();
-        int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
-        List<InetAddressAndPort> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
-        if (liveEndpoints.size() < requiredParticipants)
-            throw new UnavailableException(consistencyForPaxos, 
requiredParticipants, liveEndpoints.size());
-
-        // We cannot allow CAS operations with 2 or more pending endpoints, 
see #8346.
-        // Note that we fake an impossible number of required nodes in the 
unavailable exception
-        // to nail home the point that it's an impossible operation no matter 
how many nodes are live.
-        if (pendingEndpoints.size() > 1)
-            throw new UnavailableException(String.format("Cannot perform LWT 
operation as there is more than one (%d) pending range movement", 
pendingEndpoints.size()),
-                                           consistencyForPaxos,
-                                           participants + 1,
-                                           liveEndpoints.size());
-
-        return new PaxosParticipants(liveEndpoints, requiredParticipants);
-    }
-
     /**
      * begin a Paxos session by sending a prepare request and completing any 
in-progress requests seen in the replies
      *
@@ -396,14 +332,13 @@ public class StorageProxy implements StorageProxyMBean
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
     private static PaxosBallotAndContention beginAndRepairPaxos(long 
queryStartNanoTime,
-                                                           DecoratedKey key,
-                                                           TableMetadata 
metadata,
-                                                           
List<InetAddressAndPort> liveEndpoints,
-                                                           int 
requiredParticipants,
-                                                           ConsistencyLevel 
consistencyForPaxos,
-                                                           ConsistencyLevel 
consistencyForCommit,
-                                                           final boolean 
isWrite,
-                                                           ClientState state)
+                                                                DecoratedKey 
key,
+                                                                TableMetadata 
metadata,
+                                                                
ReplicaLayout.ForPaxos replicaLayout,
+                                                                
ConsistencyLevel consistencyForPaxos,
+                                                                
ConsistencyLevel consistencyForCommit,
+                                                                final boolean 
isWrite,
+                                                                ClientState 
state)
     throws WriteTimeoutException, WriteFailureException
     {
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
@@ -425,7 +360,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants, consistencyForPaxos, queryStartNanoTime);
+            summary = preparePaxos(toPrepare, replicaLayout, 
queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher 
ballot than ours; aborting");
@@ -448,7 +383,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, 
inProgress.update);
-                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
+                if (proposePaxos(refreshedInProgress, replicaLayout, false, 
queryStartNanoTime))
                 {
                     try
                     {
@@ -505,14 +440,14 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, 
List<InetAddressAndPort> endpoints, int requiredParticipants, ConsistencyLevel 
consistencyForPaxos, long queryStartNanoTime)
+    private static PrepareCallback preparePaxos(Commit toPrepare, 
ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
requiredParticipants, consistencyForPaxos, queryStartNanoTime);
+        PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), 
queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, 
Commit.serializer);
-        for (InetAddressAndPort target : endpoints)
+        for (Replica replica: replicaLayout.selected())
         {
-            if (canDoLocalRequest(target))
+            if (replica.isLocal())
             {
                 
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new
 Runnable()
                 {
@@ -536,21 +471,21 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
-                MessagingService.instance().sendRR(message, target, callback);
+                MessagingService.instance().sendRR(message, 
replica.endpoint(), callback);
             }
         }
         callback.await();
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, 
List<InetAddressAndPort> endpoints, int requiredParticipants, boolean 
timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
+    private static boolean proposePaxos(Commit proposal, 
ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long 
queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(endpoints.size(), 
requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
+        ProposeCallback callback = new 
ProposeCallback(replicaLayout.selected().size(), 
replicaLayout.getRequiredParticipants(), !timeoutIfPartial, 
replicaLayout.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, 
Commit.serializer);
-        for (InetAddressAndPort target : endpoints)
+        for (Replica replica : replicaLayout.selected())
         {
-            if (canDoLocalRequest(target))
+            if (replica.isLocal())
             {
                 
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new
 Runnable()
                 {
@@ -574,7 +509,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
-                MessagingService.instance().sendRR(message, target, callback);
+                MessagingService.instance().sendRR(message, 
replica.endpoint(), callback);
             }
         }
         callback.await();
@@ -583,7 +518,7 @@ public class StorageProxy implements StorageProxyMBean
             return true;
 
         if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, consistencyLevel, 
callback.getAcceptCount(), requiredParticipants);
+            throw new WriteTimeoutException(WriteType.CAS, 
replicaLayout.consistencyLevel(), callback.getAcceptCount(), 
replicaLayout.getRequiredParticipants());
 
         return false;
     }
@@ -594,30 +529,30 @@ public class StorageProxy implements StorageProxyMBean
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace);
 
         Token tk = proposal.update.partitionKey().getToken();
-        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
-        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspace.getName());
 
         AbstractWriteResponseHandler<Commit> responseHandler = null;
+        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, 
FailureDetector.instance::isAlive);
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, 
pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler = rs.getWriteResponseHandler(replicaLayout, null, 
WriteType.SIMPLE, queryStartNanoTime);
             responseHandler.setSupportsBackPressure(false);
         }
 
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, 
Commit.serializer);
-        for (InetAddressAndPort destination : 
Iterables.concat(naturalEndpoints, pendingEndpoints))
+        for (Replica replica : replicaLayout.all())
         {
-            checkHintOverload(destination);
+            InetAddressAndPort destination = replica.endpoint();
+            checkHintOverload(replica);
 
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
                 {
-                    if (canDoLocalRequest(destination))
-                        commitPaxosLocal(message, responseHandler);
+                    if (replica.isLocal())
+                        commitPaxosLocal(replica, message, responseHandler);
                     else
-                        MessagingService.instance().sendRR(message, 
destination, responseHandler, allowHints && shouldHint(destination));
+                        MessagingService.instance().sendWriteRR(message, 
replica, responseHandler, allowHints && shouldHint(replica));
                 }
                 else
                 {
@@ -630,9 +565,9 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     responseHandler.expired();
                 }
-                if (allowHints && shouldHint(destination))
+                if (allowHints && shouldHint(replica))
                 {
-                    submitHint(proposal.makeMutation(), destination, null);
+                    submitHint(proposal.makeMutation(), replica, null);
                 }
             }
         }
@@ -646,9 +581,9 @@ public class StorageProxy implements StorageProxyMBean
      * submit a fake one that executes immediately on the mutation stage, but 
generates the necessary backpressure
      * signal for hints
      */
-    private static void commitPaxosLocal(final MessageOut<Commit> message, 
final AbstractWriteResponseHandler<?> responseHandler)
+    private static void commitPaxosLocal(Replica localReplica, final 
MessageOut<Commit> message, final AbstractWriteResponseHandler<?> 
responseHandler)
     {
-        
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new
 LocalMutationRunnable()
+        
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new
 LocalMutationRunnable(localReplica)
         {
             public void runMayThrow()
             {
@@ -684,35 +619,37 @@ public class StorageProxy implements StorageProxyMBean
      * @param consistency_level the consistency level for the operation
      * @param queryStartNanoTime the value of System.nanoTime() when the query 
started to be processed
      */
-    public static void mutate(Collection<? extends IMutation> mutations, 
ConsistencyLevel consistency_level, long queryStartNanoTime)
+    public static void mutate(List<? extends IMutation> mutations, 
ConsistencyLevel consistency_level, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, 
WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
         final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
         long startTime = System.nanoTime();
+
         List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new 
ArrayList<>(mutations.size());
+        WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE : 
WriteType.UNLOGGED_BATCH;
 
         try
         {
             for (IMutation mutation : mutations)
             {
                 if (mutation instanceof CounterMutation)
-                {
                     
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, 
queryStartNanoTime));
-                }
                 else
-                {
-                    WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : 
WriteType.UNLOGGED_BATCH;
-                    responseHandlers.add(performWrite(mutation, 
consistency_level, localDataCenter, standardWritePerformer, null, wt, 
queryStartNanoTime));
-                }
+                    responseHandlers.add(performWrite(mutation, 
consistency_level, localDataCenter, standardWritePerformer, null, 
plainWriteType, queryStartNanoTime));
+            }
+
+            // upgrade to full quorum any failed cheap quorums
+            for (int i = 0 ; i < mutations.size() ; ++i)
+            {
+                if (!(mutations.get(i) instanceof CounterMutation)) // at the 
moment, only non-counter writes support cheap quorums
+                    
responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i), 
standardWritePerformer, localDataCenter);
             }
 
             // wait for writes.  throws TimeoutException if necessary
             for (AbstractWriteResponseHandler<IMutation> responseHandler : 
responseHandlers)
-            {
                 responseHandler.get();
-            }
         }
         catch (WriteTimeoutException|WriteFailureException ex)
         {
@@ -786,16 +723,12 @@ public class StorageProxy implements StorageProxyMBean
         String keyspaceName = mutation.getKeyspaceName();
         Token token = mutation.key().getToken();
 
-        Iterable<InetAddressAndPort> endpoints = 
StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token);
-        ArrayList<InetAddressAndPort> endpointsToHint = new 
ArrayList<>(Iterables.size(endpoints));
-
         // local writes can timeout, but cannot be dropped (see 
LocalMutationRunnable and CASSANDRA-6510),
         // so there is no need to hint or retry.
-        for (InetAddressAndPort target : endpoints)
-            if (!target.equals(FBUtilities.getBroadcastAddressAndPort()) && 
shouldHint(target))
-                endpointsToHint.add(target);
+        EndpointsForToken replicasToHint = 
StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, 
token)
+                .filter(StorageProxy::shouldHint);
 
-        submitHint(mutation, endpointsToHint, null);
+        submitHint(mutation, replicasToHint, null);
     }
 
     public boolean appliesLocally(Mutation mutation)
@@ -804,8 +737,8 @@ public class StorageProxy implements StorageProxyMBean
         Token token = mutation.key().getToken();
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        return StorageService.instance.getNaturalEndpoints(keyspaceName, 
token).contains(local)
-               || 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, 
keyspaceName).contains(local);
+        return 
StorageService.instance.getNaturalReplicasForToken(keyspaceName, 
token).endpoints().contains(local)
+               || 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, 
keyspaceName).endpoints().contains(local);
     }
 
     /**
@@ -854,13 +787,13 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
-                    Optional<InetAddressAndPort> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
-                    Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+                    Optional<Replica> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                    EndpointsForToken pendingReplicas = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, 
keyspaceName);
 
                     // if there are no paired endpoints there are probably 
range movements going on, so we write to the local batchlog to replay later
                     if (!pairedEndpoint.isPresent())
                     {
-                        if (pendingEndpoints.isEmpty())
+                        if (pendingReplicas.isEmpty())
                             logger.warn("Received base materialized view 
mutation for key {} that does not belong " +
                                         "to this node. There is probably a 
range movement happening (move or decommission)," +
                                         "but this node hasn't updated its ring 
metadata yet. Adding mutation to " +
@@ -872,8 +805,8 @@ public class StorageProxy implements StorageProxyMBean
                     // When local node is the endpoint we can just apply the 
mutation locally,
                     // unless there are pending endpoints, in which case we 
want to do an ordinary
                     // write so the view mutation is sent to the pending 
endpoint
-                    if 
(pairedEndpoint.get().equals(FBUtilities.getBroadcastAddressAndPort()) && 
StorageService.instance.isJoined()
-                        && pendingEndpoints.isEmpty())
+                    if (pairedEndpoint.get().isLocal() && 
StorageService.instance.isJoined()
+                        && pendingReplicas.isEmpty())
                     {
                         try
                         {
@@ -892,7 +825,8 @@ public class StorageProxy implements StorageProxyMBean
                         wrappers.add(wrapViewBatchResponseHandler(mutation,
                                                                   
consistencyLevel,
                                                                   
consistencyLevel,
-                                                                  
Collections.singletonList(pairedEndpoint.get()),
+                                                                  
EndpointsForToken.of(tk, pairedEndpoint.get()),
+                                                                  
pendingReplicas,
                                                                   baseComplete,
                                                                   
WriteType.BATCH,
                                                                   cleanup,
@@ -916,7 +850,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @SuppressWarnings("unchecked")
-    public static void mutateWithTriggers(Collection<? extends IMutation> 
mutations,
+    public static void mutateWithTriggers(List<? extends IMutation> mutations,
                                           ConsistencyLevel consistencyLevel,
                                           boolean mutateAtomically,
                                           long queryStartNanoTime)
@@ -966,6 +900,9 @@ public class StorageProxy implements StorageProxyMBean
         List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<WriteResponseHandlerWrapper>(mutations.size());
         String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
+        if (mutations.stream().anyMatch(mutation -> 
Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
+            throw new AssertionError("Logged batches are unsupported with 
transient replication");
+
         try
         {
 
@@ -982,7 +919,7 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final Collection<InetAddressAndPort> batchlogEndpoints = 
getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final Collection<InetAddressAndPort> batchlogEndpoints = 
getBatchlogReplicas(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
                                                                                
                           () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
@@ -1037,11 +974,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static boolean canDoLocalRequest(InetAddressAndPort replica)
-    {
-        return replica.equals(FBUtilities.getBroadcastAddressAndPort());
-    }
-
     private static void updateCoordinatorWriteLatencyTableMetric(Collection<? 
extends IMutation> mutations, long latency)
     {
         if (null == mutations)
@@ -1069,24 +1001,22 @@ public class StorageProxy implements StorageProxyMBean
     private static void syncWriteToBatchlog(Collection<Mutation> mutations, 
Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
-        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints,
-                                                                     
Collections.emptyList(),
-                                                                     
endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
-                                                                     
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME),
-                                                                     null,
-                                                                     
WriteType.BATCH_LOG,
-                                                                     
queryStartNanoTime);
+        Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
+        WriteResponseHandler<?> handler = new 
WriteResponseHandler(replicaLayout,
+                                                                   
WriteType.BATCH_LOG,
+                                                                   
queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), 
mutations);
         MessageOut<Batch> message = new 
MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-        for (InetAddressAndPort target : endpoints)
+        for (Replica replica : replicaLayout.all())
         {
-            logger.trace("Sending batchlog store request {} to {} for {} 
mutations", batch.id, target, batch.size());
+            logger.trace("Sending batchlog store request {} to {} for {} 
mutations", batch.id, replica, batch.size());
 
-            if (canDoLocalRequest(target))
-                performLocally(Stage.MUTATION, Optional.empty(), () -> 
BatchlogManager.store(batch), handler);
+            if (replica.isLocal())
+                performLocally(Stage.MUTATION, replica, Optional.empty(), () 
-> BatchlogManager.store(batch), handler);
             else
-                MessagingService.instance().sendRR(message, target, handler);
+                MessagingService.instance().sendRR(message, 
replica.endpoint(), handler);
         }
         handler.get();
     }
@@ -1099,8 +1029,8 @@ public class StorageProxy implements StorageProxyMBean
             if (logger.isTraceEnabled())
                 logger.trace("Sending batchlog remove request {} to {}", uuid, 
target);
 
-            if (canDoLocalRequest(target))
-                performLocally(Stage.MUTATION, () -> 
BatchlogManager.remove(uuid));
+            if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
+                performLocally(Stage.MUTATION, 
SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
             else
                 MessagingService.instance().sendOneWay(message, target);
         }
@@ -1110,11 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Iterable<InetAddressAndPort> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
+            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); 
 // TODO: CASSANDRA-14549
+            ReplicaLayout.ForToken replicas = 
wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
 
             try
             {
-                sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter, stage);
+                sendToHintedReplicas(wrapper.mutation, replicas.selected(), 
wrapper.handler, localDataCenter, stage);
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
@@ -1128,8 +1059,8 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Iterable<InetAddressAndPort> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
-            sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter, stage);
+            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); 
// TODO: CASSANDRA-14549
+            sendToHintedReplicas(wrapper.mutation, 
wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
         }
 
 
@@ -1144,7 +1075,7 @@ public class StorageProxy implements StorageProxyMBean
      * responses based on consistency level.
      *
      * @param mutation the mutation to be applied
-     * @param consistency_level the consistency level for the write operation
+     * @param consistencyLevel the consistency level for the write operation
      * @param performer the WritePerformer in charge of appliying the mutation
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
@@ -1152,33 +1083,32 @@ public class StorageProxy implements StorageProxyMBean
      * @param queryStartNanoTime the value of System.nanoTime() when the query 
started to be processed
      */
     public static AbstractWriteResponseHandler<IMutation> 
performWrite(IMutation mutation,
-                                                                       
ConsistencyLevel consistency_level,
+                                                                       
ConsistencyLevel consistencyLevel,
                                                                        String 
localDataCenter,
                                                                        
WritePerformer performer,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
                                                                        long 
queryStartNanoTime)
-    throws UnavailableException, OverloadedException
     {
         String keyspaceName = mutation.getKeyspaceName();
-        AbstractReplicationStrategy rs = 
Keyspace.open(keyspaceName).getReplicationStrategy();
+        Keyspace keyspace = Keyspace.open(keyspaceName);
+        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
 
         Token tk = mutation.key().getToken();
-        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
-        AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, callback, writeType, queryStartNanoTime);
+        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
+        AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(replicaLayout, callback, writeType, 
queryStartNanoTime);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
 
-        performer.apply(mutation, Iterables.concat(naturalEndpoints, 
pendingEndpoints), responseHandler, localDataCenter, consistency_level);
+        performer.apply(mutation, replicaLayout, responseHandler, 
localDataCenter);
         return responseHandler;
     }
 
     // same as performWrites except does not initiate writes (but does perform 
availability checks).
     private static WriteResponseHandlerWrapper 
wrapBatchResponseHandler(Mutation mutation,
-                                                                        
ConsistencyLevel consistency_level,
+                                                                        
ConsistencyLevel consistencyLevel,
                                                                         
ConsistencyLevel batchConsistencyLevel,
                                                                         
WriteType writeType,
                                                                         
BatchlogResponseHandler.BatchlogCleanup cleanup,
@@ -1186,11 +1116,10 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
-        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, null, writeType, queryStartNanoTime);
+
+        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, 
FailureDetector.instance::isAlive);
+        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
         BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, 
batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
@@ -1200,9 +1129,10 @@ public class StorageProxy implements StorageProxyMBean
      * Keeps track of ViewWriteMetrics
      */
     private static WriteResponseHandlerWrapper 
wrapViewBatchResponseHandler(Mutation mutation,
-                                                                            
ConsistencyLevel consistency_level,
+                                                                            
ConsistencyLevel consistencyLevel,
                                                                             
ConsistencyLevel batchConsistencyLevel,
-                                                                            
List<InetAddressAndPort> naturalEndpoints,
+                                                                            
EndpointsForToken naturalEndpoints,
+                                                                            
EndpointsForToken pendingEndpoints,
                                                                             
AtomicLong baseComplete,
                                                                             
WriteType writeType,
                                                                             
BatchlogResponseHandler.BatchlogCleanup cleanup,
@@ -1210,10 +1140,10 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
-        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, () -> {
+        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, 
naturalEndpoints, pendingEndpoints);
+
+        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaLayout, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - 
baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, 
TimeUnit.MILLISECONDS);
         }, writeType, queryStartNanoTime);
@@ -1241,7 +1171,7 @@ public class StorageProxy implements StorageProxyMBean
      * - choose min(2, number of qualifying candiates above)
      * - allow the local node to be the only replica only if it's a 
single-node DC
      */
-    private static Collection<InetAddressAndPort> getBatchlogEndpoints(String 
localDataCenter, ConsistencyLevel consistencyLevel)
+    private static Collection<InetAddressAndPort> getBatchlogReplicas(String 
localDataCenter, ConsistencyLevel consistencyLevel)
     throws UnavailableException
     {
         TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1254,7 +1184,7 @@ public class StorageProxy implements StorageProxyMBean
             if (consistencyLevel == ConsistencyLevel.ANY)
                 return 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
 
-            throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
+            throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0);
         }
 
         return chosenEndpoints;
@@ -1277,36 +1207,36 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
-    public static void sendToHintedEndpoints(final Mutation mutation,
-                                             Iterable<InetAddressAndPort> 
targets,
-                                             
AbstractWriteResponseHandler<IMutation> responseHandler,
-                                             String localDataCenter,
-                                             Stage stage)
+    public static void sendToHintedReplicas(final Mutation mutation,
+                                            EndpointsForToken targets,
+                                            
AbstractWriteResponseHandler<IMutation> responseHandler,
+                                            String localDataCenter,
+                                            Stage stage)
     throws OverloadedException
     {
-        int targetsSize = Iterables.size(targets);
-
         // this dc replicas:
-        Collection<InetAddressAndPort> localDc = null;
+        Collection<Replica> localDc = null;
         // extra-datacenter replicas, grouped by dc
-        Map<String, Collection<InetAddressAndPort>> dcGroups = null;
+        Map<String, Collection<Replica>> dcGroups = null;
         // only need to create a Message for non-local writes
         MessageOut<Mutation> message = null;
 
         boolean insertLocal = false;
-        ArrayList<InetAddressAndPort> endpointsToHint = null;
+        Replica localReplica = null;
+        Collection<Replica> endpointsToHint = null;
 
         List<InetAddressAndPort> backPressureHosts = null;
 
-        for (InetAddressAndPort destination : targets)
+        for (Replica destination : targets)
         {
             checkHintOverload(destination);
 
-            if (FailureDetector.instance.isAlive(destination))
+            if (FailureDetector.instance.isAlive(destination.endpoint()))
             {
-                if (canDoLocalRequest(destination))
+                if (destination.isLocal())
                 {
                     insertLocal = true;
+                    localReplica = destination;
                 }
                 else
                 {
@@ -1321,28 +1251,26 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         if (localDc == null)
-                            localDc = new ArrayList<>(targetsSize);
+                            localDc = new ArrayList<>(targets.size());
 
                         localDc.add(destination);
                     }
                     else
                     {
-                        Collection<InetAddressAndPort> messages = (dcGroups != 
null) ? dcGroups.get(dc) : null;
+                        if (dcGroups == null)
+                            dcGroups = new HashMap<>();
+
+                        Collection<Replica> messages = dcGroups.get(dc);
                         if (messages == null)
-                        {
-                            messages = new ArrayList<>(3); // most DCs will 
have <= 3 replicas
-                            if (dcGroups == null)
-                                dcGroups = new HashMap<>();
-                            dcGroups.put(dc, messages);
-                        }
+                            messages = dcGroups.computeIfAbsent(dc, (v) -> new 
ArrayList<>(3)); // most DCs will have <= 3 replicas
 
                         messages.add(destination);
                     }
 
                     if (backPressureHosts == null)
-                        backPressureHosts = new ArrayList<>(targetsSize);
+                        backPressureHosts = new ArrayList<>(targets.size());
 
-                    backPressureHosts.add(destination);
+                    backPressureHosts.add(destination.endpoint());
                 }
             }
             else
@@ -1352,7 +1280,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (shouldHint(destination))
                 {
                     if (endpointsToHint == null)
-                        endpointsToHint = new ArrayList<>(targetsSize);
+                        endpointsToHint = new ArrayList<>();
 
                     endpointsToHint.add(destination);
                 }
@@ -1363,25 +1291,28 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().applyBackPressure(backPressureHosts, 
responseHandler.currentTimeout());
 
         if (endpointsToHint != null)
-            submitHint(mutation, endpointsToHint, responseHandler);
+            submitHint(mutation, 
EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), 
responseHandler);
 
         if (insertLocal)
-            performLocally(stage, Optional.of(mutation), mutation::apply, 
responseHandler);
+        {
+            Preconditions.checkNotNull(localReplica);
+            performLocally(stage, localReplica, Optional.of(mutation), 
mutation::apply, responseHandler);
+        }
 
         if (localDc != null)
         {
-            for (InetAddressAndPort destination : localDc)
-                MessagingService.instance().sendRR(message, destination, 
responseHandler, true);
+            for (Replica destination : localDc)
+                MessagingService.instance().sendWriteRR(message, destination, 
responseHandler, true);
         }
         if (dcGroups != null)
         {
             // for each datacenter, send the message to one node to relay the 
write to other replicas
-            for (Collection<InetAddressAndPort> dcTargets : dcGroups.values())
-                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
+            for (Collection<Replica> dcTargets : dcGroups.values())
+                sendMessagesToNonlocalDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcTargets), 
responseHandler);
         }
     }
 
-    private static void checkHintOverload(InetAddressAndPort destination)
+    private static void checkHintOverload(Replica destination)
     {
         // avoid OOMing due to excess hints.  we need to do this check even 
for "live" nodes, since we can
         // still generate hints for those if it's overloaded or simply dead 
but not yet known-to-be-dead.
@@ -1389,45 +1320,46 @@ public class StorageProxy implements StorageProxyMBean
         // a small number of nodes causing problems, so we should avoid 
shutting down writes completely to
         // healthy nodes.  Any node with no hintsInProgress is considered 
healthy.
         if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
-                && (getHintsInProgressFor(destination).get() > 0 && 
shouldHint(destination)))
+                && (getHintsInProgressFor(destination.endpoint()).get() > 0 && 
shouldHint(destination)))
         {
             throw new OverloadedException("Too many in flight hints: " + 
StorageMetrics.totalHintsInProgress.getCount() +
                                           " destination: " + destination +
-                                          " destination hints: " + 
getHintsInProgressFor(destination).get());
+                                          " destination hints: " + 
getHintsInProgressFor(destination.endpoint()).get());
         }
     }
 
     private static void sendMessagesToNonlocalDC(MessageOut<? extends 
IMutation> message,
-                                                 
Collection<InetAddressAndPort> targets,
+                                                 EndpointsForToken targets,
                                                  
AbstractWriteResponseHandler<IMutation> handler)
     {
-        Iterator<InetAddressAndPort> iter = targets.iterator();
+        Iterator<Replica> iter = targets.iterator();
         int[] messageIds = new int[targets.size()];
-        InetAddressAndPort target = iter.next();
+        Replica target = iter.next();
 
         int idIdx = 0;
         // Add the other destinations of the same message as a FORWARD_HEADER 
entry
         while (iter.hasNext())
         {
-            InetAddressAndPort destination = iter.next();
-            int id = MessagingService.instance().addCallback(handler,
-                                                             message,
-                                                             destination,
-                                                             
message.getTimeout(),
-                                                             
handler.consistencyLevel,
-                                                             true);
+            Replica destination = iter.next();
+            int id = MessagingService.instance().addWriteCallback(handler,
+                                                                  message,
+                                                                  destination,
+                                                                  
message.getTimeout(),
+                                                                  
handler.replicaLayout.consistencyLevel(),
+                                                                  true);
             messageIds[idIdx++] = id;
             logger.trace("Adding FWD message to {}@{}", id, destination);
         }
-        message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, 
new ForwardToContainer(targets, messageIds));
+
+        message = message.withParameter(ParameterType.FORWARD_TO, new 
ForwardToContainer(targets.endpoints(), messageIds));
         // send the combined message + forward headers
-        int id = MessagingService.instance().sendRR(message, target, handler, 
true);
+        int id = MessagingService.instance().sendWriteRR(message, target, 
handler, true);
         logger.trace("Sending message to {}@{}", id, target);
     }
 
-    private static void performLocally(Stage stage, final Runnable runnable)
+    private static void performLocally(Stage stage, Replica localReplica, 
final Runnable runnable)
     {
-        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable()
+        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable(localReplica)
         {
             public void runMayThrow()
             {
@@ -1449,9 +1381,9 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    private static void performLocally(Stage stage, Optional<IMutation> 
mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+    private static void performLocally(Stage stage, Replica localReplica, 
Optional<IMutation> mutation, final Runnable runnable, final 
IAsyncCallbackWithFailure<?> handler)
     {
-        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable(mutation)
+        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable(localReplica, mutation)
         {
             public void runMayThrow()
             {
@@ -1492,9 +1424,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     public static AbstractWriteResponseHandler<IMutation> 
mutateCounter(CounterMutation cm, String localDataCenter, long 
queryStartNanoTime) throws UnavailableException, OverloadedException
     {
-        InetAddressAndPort endpoint = 
findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, 
cm.consistency());
+        Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), 
localDataCenter, cm.consistency());
 
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+        if (replica.isLocal())
         {
             return applyCounterMutationOnCoordinator(cm, localDataCenter, 
queryStartNanoTime);
         }
@@ -1502,18 +1434,19 @@ public class StorageProxy implements StorageProxyMBean
         {
             // Exit now if we can't fulfill the CL here instead of forwarding 
to the leader replica
             String keyspaceName = cm.getKeyspaceName();
+            Keyspace keyspace = Keyspace.open(keyspaceName);
             AbstractReplicationStrategy rs = 
Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = cm.key().getToken();
-            List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-            Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
-            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
cm.consistency(), null, WriteType.COUNTER, 
queryStartNanoTime).assureSufficientLiveNodes();
+            ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
+            rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, 
queryStartNanoTime).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new 
WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime);
+            AbstractWriteResponseHandler<IMutation> responseHandler = new 
WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+                                                                               
                  WriteType.COUNTER, queryStartNanoTime);
 
-            Tracing.trace("Enqueuing counter update to {}", endpoint);
-            MessagingService.instance().sendRR(cm.makeMutationMessage(), 
endpoint, responseHandler, false);
+            Tracing.trace("Enqueuing counter update to {}", replica);
+            MessagingService.instance().sendWriteRR(cm.makeMutationMessage(), 
replica, responseHandler, false);
             return responseHandler;
         }
     }
@@ -1528,38 +1461,37 @@ public class StorageProxy implements StorageProxyMBean
      * is unclear we want to mix those latencies with read latencies, so this
      * may be a bit involved.
      */
-    private static InetAddressAndPort findSuitableEndpoint(String 
keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) 
throws UnavailableException
+    private static Replica findSuitableReplica(String keyspaceName, 
DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws 
UnavailableException
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        List<InetAddressAndPort> endpoints = new ArrayList<>();
-        StorageService.instance.getLiveNaturalEndpoints(keyspace, key, 
endpoints);
+        EndpointsForToken replicas = 
StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients 
yet, maybe because still bootstrapping
-        endpoints.removeIf(endpoint -> 
!StorageService.instance.isRpcReady(endpoint));
+        replicas = replicas.filter(replica -> 
StorageService.instance.isRpcReady(replica.endpoint()));
 
         // TODO have a way to compute the consistency level
-        if (endpoints.isEmpty())
-            throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
+        if (replicas.isEmpty())
+            throw UnavailableException.create(cl, cl.blockFor(keyspace), 0);
 
-        List<InetAddressAndPort> localEndpoints = new 
ArrayList<>(endpoints.size());
+        List<Replica> localReplicas = new ArrayList<>(replicas.size());
 
-        for (InetAddressAndPort endpoint : endpoints)
-            if (snitch.getDatacenter(endpoint).equals(localDataCenter))
-                localEndpoints.add(endpoint);
+        for (Replica replica : replicas)
+            if (snitch.getDatacenter(replica).equals(localDataCenter))
+                localReplicas.add(replica);
 
-        if (localEndpoints.isEmpty())
+        if (localReplicas.isEmpty())
         {
             // If the consistency required is local then we should not involve 
other DCs
             if (cl.isDatacenterLocal())
-                throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
+                throw UnavailableException.create(cl, cl.blockFor(keyspace), 
0);
 
             // No endpoint in local DC, pick the closest endpoint according to 
the snitch
-            snitch.sortByProximity(FBUtilities.getBroadcastAddressAndPort(), 
endpoints);
-            return endpoints.get(0);
+            replicas = 
snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+            return replicas.get(0);
         }
 
-        return 
localEndpoints.get(ThreadLocalRandom.current().nextInt(localEndpoints.size()));
+        return 
localReplicas.get(ThreadLocalRandom.current().nextInt(localReplicas.size()));
     }
 
     // Must be called on a replica of the mutation. This replica becomes the
@@ -1579,7 +1511,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final 
Iterable<InetAddressAndPort> targets,
+                                             final EndpointsForToken targets,
                                              final 
AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
@@ -1592,11 +1524,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Mutation result = ((CounterMutation) 
mutation).applyCounterMutation();
                 responseHandler.response(null);
-
-                Set<InetAddressAndPort> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
-                                                                  
ImmutableSet.of(FBUtilities.getBroadcastAddressAndPort()));
-                if (!remotes.isEmpty())
-                    sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
+                sendToHintedReplicas(result, targets, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
             }
         };
     }
@@ -1664,9 +1592,7 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-            PaxosParticipants p = getPaxosParticipants(metadata, key, 
consistencyLevel);
-            List<InetAddressAndPort> liveEndpoints = p.liveEndpoints;
-            int requiredParticipants = p.participants;
+            ReplicaLayout.ForPaxos replicaLayout = 
ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
 
             // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
             final ConsistencyLevel consistencyForCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1675,7 +1601,7 @@ public class StorageProxy implements StorageProxyMBean
 
             try
             {
-                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, 
consistencyLevel, consistencyForCommitOrFetch, false, state);
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
                 if (pair.contentions > 0)
                     casReadMetrics.contention.update(pair.contentions);
             }
@@ -1924,28 +1850,19 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace 
keyspace, ByteBuffer key)
+    public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace 
keyspace, RingPosition pos)
     {
-        return getLiveSortedEndpoints(keyspace, 
StorageService.instance.getTokenMetadata().decorateKey(key));
+        return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
     }
 
-    public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace 
keyspace, RingPosition pos)
+    public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, 
RingPosition pos)
     {
-        List<InetAddressAndPort> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
-        
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(),
 liveEndpoints);
-        return liveEndpoints;
-    }
+        EndpointsForRange liveReplicas = 
StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
+        // Replica availability is considered by the query path
+        Preconditions.checkState(liveReplicas.isEmpty() || 
liveReplicas.stream().anyMatch(Replica::isFull),
+                                 "At least one full replica required for 
reads: " + liveReplicas);
 
-    private static List<InetAddressAndPort> 
intersection(List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
-    {
-        // Note: we don't use Guava Sets.intersection() for 3 reasons:
-        //   1) retainAll would be inefficient if l1 and l2 are large but in 
practice both are the replicas for a range and
-        //   so will be very small (< RF). In that case, retainAll is in fact 
more efficient.
-        //   2) we do ultimately need a list so converting everything to sets 
don't make sense
-        //   3) l1 and l2 are sorted by proximity. The use of retainAll  
maintain that sorting in the result, while using sets wouldn't.
-        List<InetAddressAndPort> inter = new ArrayList<>(l1);
-        inter.retainAll(l2);
-        return inter;
+        return 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 liveReplicas);
     }
 
     /**
@@ -1963,24 +1880,10 @@ public class StorageProxy implements StorageProxyMBean
                                  : index.getEstimatedResultRows();
 
         // adjust maxExpectedResults by the number of tokens this node has and 
the replication factor for this ks
-        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / 
keyspace.getReplicationStrategy().getReplicationFactor();
+        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
     }
 
-    private static class RangeForQuery
-    {
-        public final AbstractBounds<PartitionPosition> range;
-        public final List<InetAddressAndPort> liveEndpoints;
-        public final List<InetAddressAndPort> filteredEndpoints;
-
-        public RangeForQuery(AbstractBounds<PartitionPosition> range, 
List<InetAddressAndPort> liveEndpoints, List<InetAddressAndPort> 
filteredEndpoints)
-        {
-            this.range = range;
-            this.liveEndpoints = liveEndpoints;
-            this.filteredEndpoints = filteredEndpoints;
-        }
-    }
-
-    private static class RangeIterator extends AbstractIterator<RangeForQuery>
+    private static class RangeIterator extends 
AbstractIterator<ReplicaLayout.ForRange>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
@@ -2004,38 +1907,43 @@ public class StorageProxy implements StorageProxyMBean
             return rangeCount;
         }
 
-        protected RangeForQuery computeNext()
+        protected ReplicaLayout.ForRange computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
             AbstractBounds<PartitionPosition> range = ranges.next();
-            List<InetAddressAndPort> liveEndpoints = 
getLiveSortedEndpoints(keyspace, range.right);
-            return new RangeForQuery(range,
-                                     liveEndpoints,
-                                     consistency.filterForQuery(keyspace, 
liveEndpoints));
+            EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, 
range.right);
+
+            int blockFor = consistency.blockFor(keyspace);
+            EndpointsForRange targetReplicas = 
consistency.filterForQuery(keyspace, liveReplicas);
+            int minResponses = Math.min(targetReplicas.size(), blockFor);
+
+            // Endpoitns for range here as well
+            return ReplicaLayout.forRangeRead(keyspace, consistency, range,
+                                              liveReplicas, 
targetReplicas.subList(0, minResponses));
         }
     }
 
-    private static class RangeMerger extends AbstractIterator<RangeForQuery>
+    private static class RangeMerger extends 
AbstractIterator<ReplicaLayout.ForRange>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
-        private final PeekingIterator<RangeForQuery> ranges;
+        private final PeekingIterator<ReplicaLayout.ForRange> ranges;
 
-        private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace 
keyspace, ConsistencyLevel consistency)
+        private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, 
Keyspace keyspace, ConsistencyLevel consistency)
         {
             this.keyspace = keyspace;
             this.consistency = consistency;
             this.ranges = Iterators.peekingIterator(iterator);
         }
 
-        protected RangeForQuery computeNext()
+        protected ReplicaLayout.ForRange computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            RangeForQuery current = ranges.next();
+            ReplicaLayout.ForRange current = ranges.next();
 
             // getRestrictedRange has broken the queried range into 
per-[vnode] token ranges, but this doesn't take
             // the replication factor into account. If the intersection of 
live endpoints for 2 consecutive ranges
@@ -2050,22 +1958,22 @@ public class StorageProxy implements StorageProxyMBean
                 if (current.range.right.isMinimum())
                     break;
 
-                RangeForQuery next = ranges.peek();
+                ReplicaLayout.ForRange next = ranges.peek();
 
-                List<InetAddressAndPort> merged = 
intersection(current.liveEndpoints, next.liveEndpoints);
+                EndpointsForRange merged = 
current.all().keep(next.all().endpoints());
 
                 // Check if there is enough endpoint for the merge to be 
possible.
-                if (!consistency.isSufficientLiveNodes(keyspace, merged))
+                if (!consistency.isSufficientLiveNodesForRead(keyspace, 
merged))
                     break;
 
-                List<InetAddressAndPort> filteredMerged = 
consistency.filterForQuery(keyspace, merged);
+                EndpointsForRange filteredMerged = 
consistency.filterForQuery(keyspace, merged);
 
                 // Estimate whether merging will be a win or not
-                if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
 current.filteredEndpoints, next.filteredEndpoints))
+                if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
 current.selected(), next.selected()))
                     break;
 
                 // If we get there, merge this range and the next one
-                current = new 
RangeForQuery(current.range.withNewRight(next.range.right), merged, 
filteredMerged);
+                current = ReplicaLayout.forRangeRead(keyspace, consistency, 
current.range.withNewRight(next.range.right), merged, filteredMerged);
                 ranges.next(); // consume the range we just merged since we've 
only peeked so far
             }
             return current;
@@ -2110,11 +2018,9 @@ public class StorageProxy implements StorageProxyMBean
 
     private static class RangeCommandIterator extends 
AbstractIterator<RowIterator> implements PartitionIterator
     {
-        private final Iterator<RangeForQuery> ranges;
+        private final Iterator<ReplicaLayout.ForRange> ranges;
         private final int totalRangeCount;
         private final PartitionRangeReadCommand command;
-        private final Keyspace keyspace;
-        private final ConsistencyLevel consistency;
         private final boolean enforceStrictLiveness;
 
         private final long startTime;
@@ -2135,8 +2041,6 @@ public class StorageProxy implements StorageProxyMBean
             this.startTime = System.nanoTime();
             this.ranges = new RangeMerger(ranges, keyspace, consistency);
             this.totalRangeCount = ranges.rangeCount();
-            this.consistency = consistency;
-            this.keyspace = keyspace;
             this.queryStartNanoTime = queryStartNanoTime;
             this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
         }
@@ -2204,36 +2108,36 @@ public class StorageProxy implements StorageProxyMBean
         /**
          * Queries the provided sub-range.
          *
-         * @param toQuery the subRange to query.
+         * @param replicaLayout the subRange to query.
          * @param isFirst in the case where multiple queries are sent in 
parallel, whether that's the first query on
          * that batch or not. The reason it matters is that whe paging 
queries, the command (more specifically the
          * {@code DataLimits}) may have "state" information and that state may 
only be valid for the first query (in
          * that it's the query that "continues" whatever we're previously 
queried).
          */
-        private SingleRangeResponse query(RangeForQuery toQuery, boolean 
isFirst)
+        private SingleRangeResponse query(ReplicaLayout.ForRange 
replicaLayout, boolean isFirst)
         {
-            PartitionRangeReadCommand rangeCommand = 
command.forSubRange(toQuery.range, isFirst);
+            PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaLayout.range, isFirst);
+            ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = 
ReadRepair.create(command, replicaLayout, queryStartNanoTime);
+            DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = 
new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
+            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 
-            ReadRepair readRepair = ReadRepair.create(command, 
queryStartNanoTime, consistency);
-            DataResolver resolver = new DataResolver(keyspace, rangeCommand, 
consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime, readRepair);
-
-            int blockFor = consistency.blockFor(keyspace);
-            int minResponses = Math.min(toQuery.filteredEndpoints.size(), 
blockFor);
-            List<InetAddressAndPort> minimalEndpoints = 
toQuery.filteredEndpoints.subList(0, minResponses);
-            ReadCallback handler = new ReadCallback(resolver, consistency, 
rangeCommand, minimalEndpoints, queryStartNanoTime);
+            ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = 
new ReadCallback<>(resolver,
+                                                                              
replicaLayout.consistencyLevel().blockFor(keyspace),
+                                                                              
rangeCommand,
+                                                                              
replicaLayout,
+                                                                              
queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
-
-            if (toQuery.filteredEndpoints.size() == 1 && 
canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
+            if (replicaLayout.selected().size() == 1 && 
replicaLayout.selected().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(rangeCommand, handler));
             }
             else
             {
-                for (InetAddressAndPort endpoint : toQuery.filteredEndpoints)
+                for (Replica replica : replicaLayout.selected())
                 {
-                    Tracing.trace("Enqueuing request to {}", endpoint);
-                    
MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), 
endpoint, handler);
+                    Tracing.trace("Enqueuing request to {}", replica);
+                    
MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), 
replica.endpoint(), handler);
                 }
             }
 
@@ -2486,32 +2390,30 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setMaxHintWindow(ms);
     }
 
-    public static boolean shouldHint(InetAddressAndPort ep)
+    public static boolean shouldHint(Replica replica)
     {
-        if (DatabaseDescriptor.hintedHandoffEnabled())
+        if (!DatabaseDescriptor.hintedHandoffEnabled())
+            return false;
+        if (replica.isTransient() || replica.isLocal())
+            return false;
+
+        Set<String> disabledDCs = 
DatabaseDescriptor.hintedHandoffDisabledDCs();
+        if (!disabledDCs.isEmpty())
         {
-            Set<String> disabledDCs = 
DatabaseDescriptor.hintedHandoffDisabledDCs();
-            if (!disabledDCs.isEmpty())
-            {
-                final String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
-                if (disabledDCs.contains(dc))
-                {
-                    Tracing.trace("Not hinting {} since its data center {} has 
been disabled {}", ep, dc, disabledDCs);
-                    return false;
-                }
-            }
-            boolean hintWindowExpired = 
Gossiper.instance.getEndpointDowntime(ep) > 
DatabaseDescriptor.getMaxHintWindow();
-            if (hintWindowExpired)
+            final String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            if (disabledDCs.contains(dc))
             {
-                HintsService.instance.metrics.incrPastWindow(ep);
-                Tracing.trace("Not hinting {} which has been down {} ms", ep, 
Gossiper.instance.getEndpointDowntime(ep));
+                Tracing.trace("Not hinting {} since its data center {} has 
been disabled {}", replica, dc, disabledDCs);
+                return false;
             }
-            return !hintWindowExpired;
         }
-        else
+        boolean hintWindowExpired = 
Gossiper.instance.getEndpointDowntime(replica.endpoint()) > 
DatabaseDescriptor.getMaxHintWindow();
+        if (hintWindowExpired)
         {
-            return false;
+            HintsService.instance.metrics.incrPastWindow(replica.endpoint());
+            Tracing.trace("Not hinting {} which has been down {} ms", replica, 
Gossiper.instance.getEndpointDowntime(replica.endpoint()));
         }
+        return !hintWindowExpired;
     }
 
     /**
@@ -2532,7 +2434,7 @@ public class StorageProxy implements StorageProxyMBean
             // invoked by an admin, for simplicity we require that all nodes 
are up
             // to perform the operation.
             int liveMembers = Gossiper.instance.getLiveMembers().size();
-            throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + 
Gossiper.instance.getUnreachableMembers().size(), liveMembers);
+            throw UnavailableException.create(ConsistencyLevel.ALL, 
liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers);
         }
 
         Set<InetAddressAndPort> allEndpoints = 
StorageService.instance.getLiveRingMembers(true);
@@ -2571,10 +2473,9 @@ public class StorageProxy implements StorageProxyMBean
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
-                          Iterable<InetAddressAndPort> targets,
+                          ReplicaLayout.ForToken targets,
                           AbstractWriteResponseHandler<IMutation> 
responseHandler,
-                          String localDataCenter,
-                          ConsistencyLevel consistencyLevel) throws 
OverloadedException;
+                          String localDataCenter) throws OverloadedException;
     }
 
     /**
@@ -2638,15 +2539,18 @@ public class StorageProxy implements StorageProxyMBean
     {
         private final long constructionTime = System.currentTimeMillis();
 
+        private final Replica localReplica;
         private final Optional<IMutation> mutationOpt;
 
-        public LocalMutationRunnable(Optional<IMutation> mutationOpt)
+        public LocalMutationRunnable(Replica localReplica, Optional<IMutation> 
mutationOpt)
         {
+            this.localReplica = localReplica;
             this.mutationOpt = mutationOpt;
         }
 
-        public LocalMutationRunnable()
+        public LocalMutationRunnable(Replica localReplica)
         {
+            this.localReplica = localReplica;
             this.mutationOpt = Optional.empty();
         }
 
@@ -2659,7 +2563,8 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (MessagingService.DROPPABLE_VERBS.contains(verb))
                     
MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken);
-                HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddressAndPort()))
+
+                HintRunnable runnable = new 
HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica))
                 {
                     protected void runMayThrow() throws Exception
                     {
@@ -2690,9 +2595,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     private abstract static class HintRunnable implements Runnable
     {
-        public final Collection<InetAddressAndPort> targets;
+        public final EndpointsForToken targets;
 
-        protected HintRunnable(Collection<InetAddressAndPort> targets)
+        protected HintRunnable(EndpointsForToken targets)
         {
             this.targets = targets;
         }
@@ -2710,7 +2615,7 @@ public class StorageProxy implements StorageProxyMBean
             finally
             {
                 StorageMetrics.totalHintsInProgress.dec(targets.size());
-                for (InetAddressAndPort target : targets)
+                for (InetAddressAndPort target : targets.endpoints())
                     getHintsInProgressFor(target).decrementAndGet();
             }
         }
@@ -2756,22 +2661,23 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static Future<Void> submitHint(Mutation mutation, 
InetAddressAndPort target, AbstractWriteResponseHandler<IMutation> 
responseHandler)
+    public static Future<Void> submitHint(Mutation mutation, Replica target, 
AbstractWriteResponseHandler<IMutation> responseHandler)
     {
-        return submitHint(mutation, Collections.singleton(target), 
responseHandler);
+        return submitHint(mutation, EndpointsForToken.of(target.range().right, 
target), responseHandler);
     }
 
     public static Future<Void> submitHint(Mutation mutation,
-                                          Collection<InetAddressAndPort> 
targets,
+                                          EndpointsForToken targets,
                                           
AbstractWriteResponseHandler<IMutation> responseHandler)
     {
+        Replicas.assertFull(targets); // hints should not be written for 
transient replicas
         HintRunnable runnable = new HintRunnable(targets)
         {
             public void runMayThrow()
             {
                 Set<InetAddressAndPort> validTargets = new 
HashSet<>(targets.size());
                 Set<UUID> hostIds = new HashSet<>(targets.size());
-                for (InetAddressAndPort target : targets)
+                for (InetAddressAndPort target : targets.endpoints())
                 {
                     UUID hostId = 
StorageService.instance.getHostIdForEndpoint(target);
                     if (hostId != null)
@@ -2786,7 +2692,7 @@ public class StorageProxy implements StorageProxyMBean
                 HintsService.instance.write(hostIds, Hint.create(mutation, 
System.currentTimeMillis()));
                 
validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
                 // Notify the handler only for CL == ANY
-                if (responseHandler != null && 
responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+                if (responseHandler != null && 
responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
                     responseHandler.response(null);
             }
         };
@@ -2797,8 +2703,8 @@ public class StorageProxy implements StorageProxyMBean
     private static Future<Void> submitHint(HintRunnable runnable)
     {
         StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
-        for (InetAddressAndPort target : runnable.targets)
-            getHintsInProgressFor(target).incrementAndGet();
+        for (Replica target : runnable.targets)
+            getHintsInProgressFor(target.endpoint()).incrementAndGet();
         return (Future<Void>) 
StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
@@ -2892,36 +2798,6 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
     }
 
-
-    static class PaxosParticipants
-    {
-        final List<InetAddressAndPort> liveEndpoints;
-        final int participants;
-
-        PaxosParticipants(List<InetAddressAndPort> liveEndpoints, int 
participants)
-        {
-            this.liveEndpoints = liveEndpoints;
-            this.participants = participants;
-        }
-
-        @Override
-        public final int hashCode()
-        {
-            int hashCode = 31 + (liveEndpoints == null ? 0 : 
liveEndpoints.hashCode());
-            return 31 * hashCode * this.participants;
-        }
-
-        @Override
-        public final boolean equals(Object o)
-        {
-            if(!(o instanceof PaxosParticipants))
-                return false;
-            PaxosParticipants that = (PaxosParticipants)o;
-            // handles nulls properly
-            return Objects.equals(liveEndpoints, that.liveEndpoints) && 
participants == that.participants;
-        }
-    }
-
     static class PaxosBallotAndContention
     {
         final UUID ballot;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to