http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ed0cafc..c23eb88 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -29,7 +29,6 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.primitives.Ints; @@ -133,18 +132,10 @@ public class StorageProxy implements StorageProxyMBean HintsService.instance.registerMBean(); HintedHandOffManager.instance.registerMBean(); - standardWritePerformer = new WritePerformer() + standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> { - public void apply(IMutation mutation, - Iterable<InetAddressAndPort> targets, - AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - ConsistencyLevel consistency_level) - throws OverloadedException - { - assert mutation instanceof Mutation; - sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION); - } + assert mutation instanceof Mutation; + sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION); }; /* @@ -153,29 +144,19 @@ public class StorageProxy implements StorageProxyMBean * but on the latter case, the verb handler already run on the COUNTER_MUTATION stage, so we must not execute the * underlying on the stage otherwise we risk a deadlock. Hence two different performer. */ - counterWritePerformer = new WritePerformer() + counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> { - public void apply(IMutation mutation, - Iterable<InetAddressAndPort> targets, - AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - ConsistencyLevel consistencyLevel) - { - counterWriteTask(mutation, targets, responseHandler, localDataCenter).run(); - } + EndpointsForToken selected = targets.selected().withoutSelf(); + Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 + counterWriteTask(mutation, selected, responseHandler, localDataCenter).run(); }; - counterWriteOnCoordinatorPerformer = new WritePerformer() + counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) -> { - public void apply(IMutation mutation, - Iterable<InetAddressAndPort> targets, - AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - ConsistencyLevel consistencyLevel) - { - StageManager.getStage(Stage.COUNTER_MUTATION) - .execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter)); - } + EndpointsForToken selected = targets.selected().withoutSelf(); + Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 + StageManager.getStage(Stage.COUNTER_MUTATION) + .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter)); }; for(ConsistencyLevel level : ConsistencyLevel.values()) @@ -251,11 +232,9 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - queryStartNanoTime < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - PaxosParticipants p = getPaxosParticipants(metadata, key, consistencyForPaxos); - List<InetAddressAndPort> liveEndpoints = p.liveEndpoints; - int requiredParticipants = p.participants; + ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos); - final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); + final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state); final UUID ballot = pair.ballot; contentions += pair.contentions; @@ -297,7 +276,7 @@ public class StorageProxy implements StorageProxyMBean Commit proposal = Commit.newProposal(ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime)) + if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime)) { commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); Tracing.trace("CAS successful"); @@ -346,49 +325,6 @@ public class StorageProxy implements StorageProxyMBean casWriteMetrics.contention.update(contentions); } - private static Predicate<InetAddressAndPort> sameDCPredicateFor(final String dc) - { - final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - return new Predicate<InetAddressAndPort>() - { - public boolean apply(InetAddressAndPort host) - { - return dc.equals(snitch.getDatacenter(host)); - } - }; - } - - private static PaxosParticipants getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException - { - Token tk = key.getToken(); - List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); - if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) - { - // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only - String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - Predicate<InetAddressAndPort> isLocalDc = sameDCPredicateFor(localDc); - naturalEndpoints = ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc)); - pendingEndpoints = ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc)); - } - int participants = pendingEndpoints.size() + naturalEndpoints.size(); - int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 - List<InetAddressAndPort> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); - if (liveEndpoints.size() < requiredParticipants) - throw new UnavailableException(consistencyForPaxos, requiredParticipants, liveEndpoints.size()); - - // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. - // Note that we fake an impossible number of required nodes in the unavailable exception - // to nail home the point that it's an impossible operation no matter how many nodes are live. - if (pendingEndpoints.size() > 1) - throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pendingEndpoints.size()), - consistencyForPaxos, - participants + 1, - liveEndpoints.size()); - - return new PaxosParticipants(liveEndpoints, requiredParticipants); - } - /** * begin a Paxos session by sending a prepare request and completing any in-progress requests seen in the replies * @@ -396,14 +332,13 @@ public class StorageProxy implements StorageProxyMBean * nodes have seen the mostRecentCommit. Otherwise, return null. */ private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, - DecoratedKey key, - TableMetadata metadata, - List<InetAddressAndPort> liveEndpoints, - int requiredParticipants, - ConsistencyLevel consistencyForPaxos, - ConsistencyLevel consistencyForCommit, - final boolean isWrite, - ClientState state) + DecoratedKey key, + TableMetadata metadata, + ReplicaLayout.ForPaxos replicaLayout, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit, + final boolean isWrite, + ClientState state) throws WriteTimeoutException, WriteFailureException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); @@ -425,7 +360,7 @@ public class StorageProxy implements StorageProxyMBean // prepare Tracing.trace("Preparing {}", ballot); Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime); + summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime); if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); @@ -448,7 +383,7 @@ public class StorageProxy implements StorageProxyMBean else casReadMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); - if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime)) + if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime)) { try { @@ -505,14 +440,14 @@ public class StorageProxy implements StorageProxyMBean MessagingService.instance().sendOneWay(message, target); } - private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddressAndPort> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime) + private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime) throws WriteTimeoutException { - PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime); + PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); - for (InetAddressAndPort target : endpoints) + for (Replica replica: replicaLayout.selected()) { - if (canDoLocalRequest(target)) + if (replica.isLocal()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable() { @@ -536,21 +471,21 @@ public class StorageProxy implements StorageProxyMBean } else { - MessagingService.instance().sendRR(message, target, callback); + MessagingService.instance().sendRR(message, replica.endpoint(), callback); } } callback.await(); return callback; } - private static boolean proposePaxos(Commit proposal, List<InetAddressAndPort> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime) + private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime) throws WriteTimeoutException { - ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime); + ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); - for (InetAddressAndPort target : endpoints) + for (Replica replica : replicaLayout.selected()) { - if (canDoLocalRequest(target)) + if (replica.isLocal()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable() { @@ -574,7 +509,7 @@ public class StorageProxy implements StorageProxyMBean } else { - MessagingService.instance().sendRR(message, target, callback); + MessagingService.instance().sendRR(message, replica.endpoint(), callback); } } callback.await(); @@ -583,7 +518,7 @@ public class StorageProxy implements StorageProxyMBean return true; if (timeoutIfPartial && !callback.isFullyRefused()) - throw new WriteTimeoutException(WriteType.CAS, consistencyLevel, callback.getAcceptCount(), requiredParticipants); + throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants()); return false; } @@ -594,30 +529,30 @@ public class StorageProxy implements StorageProxyMBean Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace); Token tk = proposal.update.partitionKey().getToken(); - List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); AbstractWriteResponseHandler<Commit> responseHandler = null; + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive); if (shouldBlock) { AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime); + responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime); responseHandler.setSupportsBackPressure(false); } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); - for (InetAddressAndPort destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) + for (Replica replica : replicaLayout.all()) { - checkHintOverload(destination); + InetAddressAndPort destination = replica.endpoint(); + checkHintOverload(replica); if (FailureDetector.instance.isAlive(destination)) { if (shouldBlock) { - if (canDoLocalRequest(destination)) - commitPaxosLocal(message, responseHandler); + if (replica.isLocal()) + commitPaxosLocal(replica, message, responseHandler); else - MessagingService.instance().sendRR(message, destination, responseHandler, allowHints && shouldHint(destination)); + MessagingService.instance().sendWriteRR(message, replica, responseHandler, allowHints && shouldHint(replica)); } else { @@ -630,9 +565,9 @@ public class StorageProxy implements StorageProxyMBean { responseHandler.expired(); } - if (allowHints && shouldHint(destination)) + if (allowHints && shouldHint(replica)) { - submitHint(proposal.makeMutation(), destination, null); + submitHint(proposal.makeMutation(), replica, null); } } } @@ -646,9 +581,9 @@ public class StorageProxy implements StorageProxyMBean * submit a fake one that executes immediately on the mutation stage, but generates the necessary backpressure * signal for hints */ - private static void commitPaxosLocal(final MessageOut<Commit> message, final AbstractWriteResponseHandler<?> responseHandler) + private static void commitPaxosLocal(Replica localReplica, final MessageOut<Commit> message, final AbstractWriteResponseHandler<?> responseHandler) { - StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new LocalMutationRunnable() + StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new LocalMutationRunnable(localReplica) { public void runMayThrow() { @@ -684,35 +619,37 @@ public class StorageProxy implements StorageProxyMBean * @param consistency_level the consistency level for the operation * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ - public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime) + public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); long startTime = System.nanoTime(); + List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size()); + WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH; try { for (IMutation mutation : mutations) { if (mutation instanceof CounterMutation) - { responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime)); - } else - { - WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH; - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt, queryStartNanoTime)); - } + responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); + } + + // upgrade to full quorum any failed cheap quorums + for (int i = 0 ; i < mutations.size() ; ++i) + { + if (!(mutations.get(i) instanceof CounterMutation)) // at the moment, only non-counter writes support cheap quorums + responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i), standardWritePerformer, localDataCenter); } // wait for writes. throws TimeoutException if necessary for (AbstractWriteResponseHandler<IMutation> responseHandler : responseHandlers) - { responseHandler.get(); - } } catch (WriteTimeoutException|WriteFailureException ex) { @@ -786,16 +723,12 @@ public class StorageProxy implements StorageProxyMBean String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - Iterable<InetAddressAndPort> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token); - ArrayList<InetAddressAndPort> endpointsToHint = new ArrayList<>(Iterables.size(endpoints)); - // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510), // so there is no need to hint or retry. - for (InetAddressAndPort target : endpoints) - if (!target.equals(FBUtilities.getBroadcastAddressAndPort()) && shouldHint(target)) - endpointsToHint.add(target); + EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token) + .filter(StorageProxy::shouldHint); - submitHint(mutation, endpointsToHint, null); + submitHint(mutation, replicasToHint, null); } public boolean appliesLocally(Mutation mutation) @@ -804,8 +737,8 @@ public class StorageProxy implements StorageProxyMBean Token token = mutation.key().getToken(); InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - return StorageService.instance.getNaturalEndpoints(keyspaceName, token).contains(local) - || StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName).contains(local); + return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local) + || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local); } /** @@ -854,13 +787,13 @@ public class StorageProxy implements StorageProxyMBean { String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - Optional<InetAddressAndPort> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + Optional<Replica> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); + EndpointsForToken pendingReplicas = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspaceName); // if there are no paired endpoints there are probably range movements going on, so we write to the local batchlog to replay later if (!pairedEndpoint.isPresent()) { - if (pendingEndpoints.isEmpty()) + if (pendingReplicas.isEmpty()) logger.warn("Received base materialized view mutation for key {} that does not belong " + "to this node. There is probably a range movement happening (move or decommission)," + "but this node hasn't updated its ring metadata yet. Adding mutation to " + @@ -872,8 +805,8 @@ public class StorageProxy implements StorageProxyMBean // When local node is the endpoint we can just apply the mutation locally, // unless there are pending endpoints, in which case we want to do an ordinary // write so the view mutation is sent to the pending endpoint - if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddressAndPort()) && StorageService.instance.isJoined() - && pendingEndpoints.isEmpty()) + if (pairedEndpoint.get().isLocal() && StorageService.instance.isJoined() + && pendingReplicas.isEmpty()) { try { @@ -892,7 +825,8 @@ public class StorageProxy implements StorageProxyMBean wrappers.add(wrapViewBatchResponseHandler(mutation, consistencyLevel, consistencyLevel, - Collections.singletonList(pairedEndpoint.get()), + EndpointsForToken.of(tk, pairedEndpoint.get()), + pendingReplicas, baseComplete, WriteType.BATCH, cleanup, @@ -916,7 +850,7 @@ public class StorageProxy implements StorageProxyMBean } @SuppressWarnings("unchecked") - public static void mutateWithTriggers(Collection<? extends IMutation> mutations, + public static void mutateWithTriggers(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically, long queryStartNanoTime) @@ -966,6 +900,9 @@ public class StorageProxy implements StorageProxyMBean List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size()); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas())) + throw new AssertionError("Logged batches are unsupported with transient replication"); + try { @@ -982,7 +919,7 @@ public class StorageProxy implements StorageProxyMBean batchConsistencyLevel = consistency_level; } - final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); + final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogReplicas(localDataCenter, batchConsistencyLevel); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); @@ -1037,11 +974,6 @@ public class StorageProxy implements StorageProxyMBean } } - public static boolean canDoLocalRequest(InetAddressAndPort replica) - { - return replica.equals(FBUtilities.getBroadcastAddressAndPort()); - } - private static void updateCoordinatorWriteLatencyTableMetric(Collection<? extends IMutation> mutations, long latency) { if (null == mutations) @@ -1069,24 +1001,22 @@ public class StorageProxy implements StorageProxyMBean private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { - WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints, - Collections.emptyList(), - endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, - Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME), - null, - WriteType.BATCH_LOG, - queryStartNanoTime); + Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints); + WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout, + WriteType.BATCH_LOG, + queryStartNanoTime); Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - for (InetAddressAndPort target : endpoints) + for (Replica replica : replicaLayout.all()) { - logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); + logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); - if (canDoLocalRequest(target)) - performLocally(Stage.MUTATION, Optional.empty(), () -> BatchlogManager.store(batch), handler); + if (replica.isLocal()) + performLocally(Stage.MUTATION, replica, Optional.empty(), () -> BatchlogManager.store(batch), handler); else - MessagingService.instance().sendRR(message, target, handler); + MessagingService.instance().sendRR(message, replica.endpoint(), handler); } handler.get(); } @@ -1099,8 +1029,8 @@ public class StorageProxy implements StorageProxyMBean if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); - if (canDoLocalRequest(target)) - performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid)); + if (target.equals(FBUtilities.getBroadcastAddressAndPort())) + performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); else MessagingService.instance().sendOneWay(message, target); } @@ -1110,11 +1040,12 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Iterable<InetAddressAndPort> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549 + ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all()); try { - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); + sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage); } catch (OverloadedException | WriteTimeoutException e) { @@ -1128,8 +1059,8 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Iterable<InetAddressAndPort> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); + Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549 + sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage); } @@ -1144,7 +1075,7 @@ public class StorageProxy implements StorageProxyMBean * responses based on consistency level. * * @param mutation the mutation to be applied - * @param consistency_level the consistency level for the write operation + * @param consistencyLevel the consistency level for the write operation * @param performer the WritePerformer in charge of appliying the mutation * given the list of write endpoints (either standardWritePerformer for * standard writes or counterWritePerformer for counter writes). @@ -1152,33 +1083,32 @@ public class StorageProxy implements StorageProxyMBean * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation, - ConsistencyLevel consistency_level, + ConsistencyLevel consistencyLevel, String localDataCenter, WritePerformer performer, Runnable callback, WriteType writeType, long queryStartNanoTime) - throws UnavailableException, OverloadedException { String keyspaceName = mutation.getKeyspaceName(); - AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); + Keyspace keyspace = Keyspace.open(keyspaceName); + AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); Token tk = mutation.key().getToken(); - List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime); + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); - performer.apply(mutation, Iterables.concat(naturalEndpoints, pendingEndpoints), responseHandler, localDataCenter, consistency_level); + performer.apply(mutation, replicaLayout, responseHandler, localDataCenter); return responseHandler; } // same as performWrites except does not initiate writes (but does perform availability checks). private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation, - ConsistencyLevel consistency_level, + ConsistencyLevel consistencyLevel, ConsistencyLevel batchConsistencyLevel, WriteType writeType, BatchlogResponseHandler.BatchlogCleanup cleanup, @@ -1186,11 +1116,10 @@ public class StorageProxy implements StorageProxyMBean { Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime); + + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive); + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime); BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime); return new WriteResponseHandlerWrapper(batchHandler, mutation); } @@ -1200,9 +1129,10 @@ public class StorageProxy implements StorageProxyMBean * Keeps track of ViewWriteMetrics */ private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation, - ConsistencyLevel consistency_level, + ConsistencyLevel consistencyLevel, ConsistencyLevel batchConsistencyLevel, - List<InetAddressAndPort> naturalEndpoints, + EndpointsForToken naturalEndpoints, + EndpointsForToken pendingEndpoints, AtomicLong baseComplete, WriteType writeType, BatchlogResponseHandler.BatchlogCleanup cleanup, @@ -1210,10 +1140,10 @@ public class StorageProxy implements StorageProxyMBean { Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> { + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints); + + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> { long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get()); viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS); }, writeType, queryStartNanoTime); @@ -1241,7 +1171,7 @@ public class StorageProxy implements StorageProxyMBean * - choose min(2, number of qualifying candiates above) * - allow the local node to be the only replica only if it's a single-node DC */ - private static Collection<InetAddressAndPort> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) + private static Collection<InetAddressAndPort> getBatchlogReplicas(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); @@ -1254,7 +1184,7 @@ public class StorageProxy implements StorageProxyMBean if (consistencyLevel == ConsistencyLevel.ANY) return Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); + throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); } return chosenEndpoints; @@ -1277,36 +1207,36 @@ public class StorageProxy implements StorageProxyMBean * * @throws OverloadedException if the hints cannot be written/enqueued */ - public static void sendToHintedEndpoints(final Mutation mutation, - Iterable<InetAddressAndPort> targets, - AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - Stage stage) + public static void sendToHintedReplicas(final Mutation mutation, + EndpointsForToken targets, + AbstractWriteResponseHandler<IMutation> responseHandler, + String localDataCenter, + Stage stage) throws OverloadedException { - int targetsSize = Iterables.size(targets); - // this dc replicas: - Collection<InetAddressAndPort> localDc = null; + Collection<Replica> localDc = null; // extra-datacenter replicas, grouped by dc - Map<String, Collection<InetAddressAndPort>> dcGroups = null; + Map<String, Collection<Replica>> dcGroups = null; // only need to create a Message for non-local writes MessageOut<Mutation> message = null; boolean insertLocal = false; - ArrayList<InetAddressAndPort> endpointsToHint = null; + Replica localReplica = null; + Collection<Replica> endpointsToHint = null; List<InetAddressAndPort> backPressureHosts = null; - for (InetAddressAndPort destination : targets) + for (Replica destination : targets) { checkHintOverload(destination); - if (FailureDetector.instance.isAlive(destination)) + if (FailureDetector.instance.isAlive(destination.endpoint())) { - if (canDoLocalRequest(destination)) + if (destination.isLocal()) { insertLocal = true; + localReplica = destination; } else { @@ -1321,28 +1251,26 @@ public class StorageProxy implements StorageProxyMBean if (localDataCenter.equals(dc)) { if (localDc == null) - localDc = new ArrayList<>(targetsSize); + localDc = new ArrayList<>(targets.size()); localDc.add(destination); } else { - Collection<InetAddressAndPort> messages = (dcGroups != null) ? dcGroups.get(dc) : null; + if (dcGroups == null) + dcGroups = new HashMap<>(); + + Collection<Replica> messages = dcGroups.get(dc); if (messages == null) - { - messages = new ArrayList<>(3); // most DCs will have <= 3 replicas - if (dcGroups == null) - dcGroups = new HashMap<>(); - dcGroups.put(dc, messages); - } + messages = dcGroups.computeIfAbsent(dc, (v) -> new ArrayList<>(3)); // most DCs will have <= 3 replicas messages.add(destination); } if (backPressureHosts == null) - backPressureHosts = new ArrayList<>(targetsSize); + backPressureHosts = new ArrayList<>(targets.size()); - backPressureHosts.add(destination); + backPressureHosts.add(destination.endpoint()); } } else @@ -1352,7 +1280,7 @@ public class StorageProxy implements StorageProxyMBean if (shouldHint(destination)) { if (endpointsToHint == null) - endpointsToHint = new ArrayList<>(targetsSize); + endpointsToHint = new ArrayList<>(); endpointsToHint.add(destination); } @@ -1363,25 +1291,28 @@ public class StorageProxy implements StorageProxyMBean MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeout()); if (endpointsToHint != null) - submitHint(mutation, endpointsToHint, responseHandler); + submitHint(mutation, EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), responseHandler); if (insertLocal) - performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler); + { + Preconditions.checkNotNull(localReplica); + performLocally(stage, localReplica, Optional.of(mutation), mutation::apply, responseHandler); + } if (localDc != null) { - for (InetAddressAndPort destination : localDc) - MessagingService.instance().sendRR(message, destination, responseHandler, true); + for (Replica destination : localDc) + MessagingService.instance().sendWriteRR(message, destination, responseHandler, true); } if (dcGroups != null) { // for each datacenter, send the message to one node to relay the write to other replicas - for (Collection<InetAddressAndPort> dcTargets : dcGroups.values()) - sendMessagesToNonlocalDC(message, dcTargets, responseHandler); + for (Collection<Replica> dcTargets : dcGroups.values()) + sendMessagesToNonlocalDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcTargets), responseHandler); } } - private static void checkHintOverload(InetAddressAndPort destination) + private static void checkHintOverload(Replica destination) { // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. @@ -1389,45 +1320,46 @@ public class StorageProxy implements StorageProxyMBean // a small number of nodes causing problems, so we should avoid shutting down writes completely to // healthy nodes. Any node with no hintsInProgress is considered healthy. if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress - && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination))) + && (getHintsInProgressFor(destination.endpoint()).get() > 0 && shouldHint(destination))) { throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() + " destination: " + destination + - " destination hints: " + getHintsInProgressFor(destination).get()); + " destination hints: " + getHintsInProgressFor(destination.endpoint()).get()); } } private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, - Collection<InetAddressAndPort> targets, + EndpointsForToken targets, AbstractWriteResponseHandler<IMutation> handler) { - Iterator<InetAddressAndPort> iter = targets.iterator(); + Iterator<Replica> iter = targets.iterator(); int[] messageIds = new int[targets.size()]; - InetAddressAndPort target = iter.next(); + Replica target = iter.next(); int idIdx = 0; // Add the other destinations of the same message as a FORWARD_HEADER entry while (iter.hasNext()) { - InetAddressAndPort destination = iter.next(); - int id = MessagingService.instance().addCallback(handler, - message, - destination, - message.getTimeout(), - handler.consistencyLevel, - true); + Replica destination = iter.next(); + int id = MessagingService.instance().addWriteCallback(handler, + message, + destination, + message.getTimeout(), + handler.replicaLayout.consistencyLevel(), + true); messageIds[idIdx++] = id; logger.trace("Adding FWD message to {}@{}", id, destination); } - message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new ForwardToContainer(targets, messageIds)); + + message = message.withParameter(ParameterType.FORWARD_TO, new ForwardToContainer(targets.endpoints(), messageIds)); // send the combined message + forward headers - int id = MessagingService.instance().sendRR(message, target, handler, true); + int id = MessagingService.instance().sendWriteRR(message, target, handler, true); logger.trace("Sending message to {}@{}", id, target); } - private static void performLocally(Stage stage, final Runnable runnable) + private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable) { - StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(localReplica) { public void runMayThrow() { @@ -1449,9 +1381,9 @@ public class StorageProxy implements StorageProxyMBean }); } - private static void performLocally(Stage stage, Optional<IMutation> mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) + private static void performLocally(Stage stage, Replica localReplica, Optional<IMutation> mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) { - StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation) + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(localReplica, mutation) { public void runMayThrow() { @@ -1492,9 +1424,9 @@ public class StorageProxy implements StorageProxyMBean */ public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException { - InetAddressAndPort endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); + Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); - if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) + if (replica.isLocal()) { return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); } @@ -1502,18 +1434,19 @@ public class StorageProxy implements StorageProxyMBean { // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica String keyspaceName = cm.getKeyspaceName(); + Keyspace keyspace = Keyspace.open(keyspaceName); AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); Token tk = cm.key().getToken(); - List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes(); + ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk); + rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes(); // Forward the actual update to the chosen leader replica - AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime); + AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica), + WriteType.COUNTER, queryStartNanoTime); - Tracing.trace("Enqueuing counter update to {}", endpoint); - MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false); + Tracing.trace("Enqueuing counter update to {}", replica); + MessagingService.instance().sendWriteRR(cm.makeMutationMessage(), replica, responseHandler, false); return responseHandler; } } @@ -1528,38 +1461,37 @@ public class StorageProxy implements StorageProxyMBean * is unclear we want to mix those latencies with read latencies, so this * may be a bit involved. */ - private static InetAddressAndPort findSuitableEndpoint(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException + private static Replica findSuitableReplica(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException { Keyspace keyspace = Keyspace.open(keyspaceName); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - List<InetAddressAndPort> endpoints = new ArrayList<>(); - StorageService.instance.getLiveNaturalEndpoints(keyspace, key, endpoints); + EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key); // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping - endpoints.removeIf(endpoint -> !StorageService.instance.isRpcReady(endpoint)); + replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint())); // TODO have a way to compute the consistency level - if (endpoints.isEmpty()) - throw new UnavailableException(cl, cl.blockFor(keyspace), 0); + if (replicas.isEmpty()) + throw UnavailableException.create(cl, cl.blockFor(keyspace), 0); - List<InetAddressAndPort> localEndpoints = new ArrayList<>(endpoints.size()); + List<Replica> localReplicas = new ArrayList<>(replicas.size()); - for (InetAddressAndPort endpoint : endpoints) - if (snitch.getDatacenter(endpoint).equals(localDataCenter)) - localEndpoints.add(endpoint); + for (Replica replica : replicas) + if (snitch.getDatacenter(replica).equals(localDataCenter)) + localReplicas.add(replica); - if (localEndpoints.isEmpty()) + if (localReplicas.isEmpty()) { // If the consistency required is local then we should not involve other DCs if (cl.isDatacenterLocal()) - throw new UnavailableException(cl, cl.blockFor(keyspace), 0); + throw UnavailableException.create(cl, cl.blockFor(keyspace), 0); // No endpoint in local DC, pick the closest endpoint according to the snitch - snitch.sortByProximity(FBUtilities.getBroadcastAddressAndPort(), endpoints); - return endpoints.get(0); + replicas = snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); + return replicas.get(0); } - return localEndpoints.get(ThreadLocalRandom.current().nextInt(localEndpoints.size())); + return localReplicas.get(ThreadLocalRandom.current().nextInt(localReplicas.size())); } // Must be called on a replica of the mutation. This replica becomes the @@ -1579,7 +1511,7 @@ public class StorageProxy implements StorageProxyMBean } private static Runnable counterWriteTask(final IMutation mutation, - final Iterable<InetAddressAndPort> targets, + final EndpointsForToken targets, final AbstractWriteResponseHandler<IMutation> responseHandler, final String localDataCenter) { @@ -1592,11 +1524,7 @@ public class StorageProxy implements StorageProxyMBean Mutation result = ((CounterMutation) mutation).applyCounterMutation(); responseHandler.response(null); - - Set<InetAddressAndPort> remotes = Sets.difference(ImmutableSet.copyOf(targets), - ImmutableSet.of(FBUtilities.getBroadcastAddressAndPort())); - if (!remotes.isEmpty()) - sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); + sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); } }; } @@ -1664,9 +1592,7 @@ public class StorageProxy implements StorageProxyMBean try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - PaxosParticipants p = getPaxosParticipants(metadata, key, consistencyLevel); - List<InetAddressAndPort> liveEndpoints = p.liveEndpoints; - int requiredParticipants = p.participants; + ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel); // does the work of applying in-progress writes; throws UAE or timeout if it can't final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL @@ -1675,7 +1601,7 @@ public class StorageProxy implements StorageProxyMBean try { - final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); + final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state); if (pair.contentions > 0) casReadMetrics.contention.update(pair.contentions); } @@ -1924,28 +1850,19 @@ public class StorageProxy implements StorageProxyMBean } } - public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key) + public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos) { - return getLiveSortedEndpoints(keyspace, StorageService.instance.getTokenMetadata().decorateKey(key)); + return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken()); } - public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) + public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos) { - List<InetAddressAndPort> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos); - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(), liveEndpoints); - return liveEndpoints; - } + EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos); + // Replica availability is considered by the query path + Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull), + "At least one full replica required for reads: " + liveReplicas); - private static List<InetAddressAndPort> intersection(List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) - { - // Note: we don't use Guava Sets.intersection() for 3 reasons: - // 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and - // so will be very small (< RF). In that case, retainAll is in fact more efficient. - // 2) we do ultimately need a list so converting everything to sets don't make sense - // 3) l1 and l2 are sorted by proximity. The use of retainAll maintain that sorting in the result, while using sets wouldn't. - List<InetAddressAndPort> inter = new ArrayList<>(l1); - inter.retainAll(l2); - return inter; + return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas); } /** @@ -1963,24 +1880,10 @@ public class StorageProxy implements StorageProxyMBean : index.getEstimatedResultRows(); // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks - return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); + return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; } - private static class RangeForQuery - { - public final AbstractBounds<PartitionPosition> range; - public final List<InetAddressAndPort> liveEndpoints; - public final List<InetAddressAndPort> filteredEndpoints; - - public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddressAndPort> liveEndpoints, List<InetAddressAndPort> filteredEndpoints) - { - this.range = range; - this.liveEndpoints = liveEndpoints; - this.filteredEndpoints = filteredEndpoints; - } - } - - private static class RangeIterator extends AbstractIterator<RangeForQuery> + private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange> { private final Keyspace keyspace; private final ConsistencyLevel consistency; @@ -2004,38 +1907,43 @@ public class StorageProxy implements StorageProxyMBean return rangeCount; } - protected RangeForQuery computeNext() + protected ReplicaLayout.ForRange computeNext() { if (!ranges.hasNext()) return endOfData(); AbstractBounds<PartitionPosition> range = ranges.next(); - List<InetAddressAndPort> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right); - return new RangeForQuery(range, - liveEndpoints, - consistency.filterForQuery(keyspace, liveEndpoints)); + EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right); + + int blockFor = consistency.blockFor(keyspace); + EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas); + int minResponses = Math.min(targetReplicas.size(), blockFor); + + // Endpoitns for range here as well + return ReplicaLayout.forRangeRead(keyspace, consistency, range, + liveReplicas, targetReplicas.subList(0, minResponses)); } } - private static class RangeMerger extends AbstractIterator<RangeForQuery> + private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange> { private final Keyspace keyspace; private final ConsistencyLevel consistency; - private final PeekingIterator<RangeForQuery> ranges; + private final PeekingIterator<ReplicaLayout.ForRange> ranges; - private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency) + private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency) { this.keyspace = keyspace; this.consistency = consistency; this.ranges = Iterators.peekingIterator(iterator); } - protected RangeForQuery computeNext() + protected ReplicaLayout.ForRange computeNext() { if (!ranges.hasNext()) return endOfData(); - RangeForQuery current = ranges.next(); + ReplicaLayout.ForRange current = ranges.next(); // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges @@ -2050,22 +1958,22 @@ public class StorageProxy implements StorageProxyMBean if (current.range.right.isMinimum()) break; - RangeForQuery next = ranges.peek(); + ReplicaLayout.ForRange next = ranges.peek(); - List<InetAddressAndPort> merged = intersection(current.liveEndpoints, next.liveEndpoints); + EndpointsForRange merged = current.all().keep(next.all().endpoints()); // Check if there is enough endpoint for the merge to be possible. - if (!consistency.isSufficientLiveNodes(keyspace, merged)) + if (!consistency.isSufficientLiveNodesForRead(keyspace, merged)) break; - List<InetAddressAndPort> filteredMerged = consistency.filterForQuery(keyspace, merged); + EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged); // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.filteredEndpoints, next.filteredEndpoints)) + if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected())) break; // If we get there, merge this range and the next one - current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged); + current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged); ranges.next(); // consume the range we just merged since we've only peeked so far } return current; @@ -2110,11 +2018,9 @@ public class StorageProxy implements StorageProxyMBean private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator { - private final Iterator<RangeForQuery> ranges; + private final Iterator<ReplicaLayout.ForRange> ranges; private final int totalRangeCount; private final PartitionRangeReadCommand command; - private final Keyspace keyspace; - private final ConsistencyLevel consistency; private final boolean enforceStrictLiveness; private final long startTime; @@ -2135,8 +2041,6 @@ public class StorageProxy implements StorageProxyMBean this.startTime = System.nanoTime(); this.ranges = new RangeMerger(ranges, keyspace, consistency); this.totalRangeCount = ranges.rangeCount(); - this.consistency = consistency; - this.keyspace = keyspace; this.queryStartNanoTime = queryStartNanoTime; this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } @@ -2204,36 +2108,36 @@ public class StorageProxy implements StorageProxyMBean /** * Queries the provided sub-range. * - * @param toQuery the subRange to query. + * @param replicaLayout the subRange to query. * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in * that it's the query that "continues" whatever we're previously queried). */ - private SingleRangeResponse query(RangeForQuery toQuery, boolean isFirst) + private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst) { - PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst); + PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst); + ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime); + DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime); + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - ReadRepair readRepair = ReadRepair.create(command, queryStartNanoTime, consistency); - DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime, readRepair); - - int blockFor = consistency.blockFor(keyspace); - int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); - List<InetAddressAndPort> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); - ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime); + ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver, + replicaLayout.consistencyLevel().blockFor(keyspace), + rangeCommand, + replicaLayout, + queryStartNanoTime); handler.assureSufficientLiveNodes(); - - if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal()) { StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); } else { - for (InetAddressAndPort endpoint : toQuery.filteredEndpoints) + for (Replica replica : replicaLayout.selected()) { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), endpoint, handler); + Tracing.trace("Enqueuing request to {}", replica); + MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), replica.endpoint(), handler); } } @@ -2486,32 +2390,30 @@ public class StorageProxy implements StorageProxyMBean DatabaseDescriptor.setMaxHintWindow(ms); } - public static boolean shouldHint(InetAddressAndPort ep) + public static boolean shouldHint(Replica replica) { - if (DatabaseDescriptor.hintedHandoffEnabled()) + if (!DatabaseDescriptor.hintedHandoffEnabled()) + return false; + if (replica.isTransient() || replica.isLocal()) + return false; + + Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs(); + if (!disabledDCs.isEmpty()) { - Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs(); - if (!disabledDCs.isEmpty()) - { - final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep); - if (disabledDCs.contains(dc)) - { - Tracing.trace("Not hinting {} since its data center {} has been disabled {}", ep, dc, disabledDCs); - return false; - } - } - boolean hintWindowExpired = Gossiper.instance.getEndpointDowntime(ep) > DatabaseDescriptor.getMaxHintWindow(); - if (hintWindowExpired) + final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + if (disabledDCs.contains(dc)) { - HintsService.instance.metrics.incrPastWindow(ep); - Tracing.trace("Not hinting {} which has been down {} ms", ep, Gossiper.instance.getEndpointDowntime(ep)); + Tracing.trace("Not hinting {} since its data center {} has been disabled {}", replica, dc, disabledDCs); + return false; } - return !hintWindowExpired; } - else + boolean hintWindowExpired = Gossiper.instance.getEndpointDowntime(replica.endpoint()) > DatabaseDescriptor.getMaxHintWindow(); + if (hintWindowExpired) { - return false; + HintsService.instance.metrics.incrPastWindow(replica.endpoint()); + Tracing.trace("Not hinting {} which has been down {} ms", replica, Gossiper.instance.getEndpointDowntime(replica.endpoint())); } + return !hintWindowExpired; } /** @@ -2532,7 +2434,7 @@ public class StorageProxy implements StorageProxyMBean // invoked by an admin, for simplicity we require that all nodes are up // to perform the operation. int liveMembers = Gossiper.instance.getLiveMembers().size(); - throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers); + throw UnavailableException.create(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers); } Set<InetAddressAndPort> allEndpoints = StorageService.instance.getLiveRingMembers(true); @@ -2571,10 +2473,9 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { public void apply(IMutation mutation, - Iterable<InetAddressAndPort> targets, + ReplicaLayout.ForToken targets, AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - ConsistencyLevel consistencyLevel) throws OverloadedException; + String localDataCenter) throws OverloadedException; } /** @@ -2638,15 +2539,18 @@ public class StorageProxy implements StorageProxyMBean { private final long constructionTime = System.currentTimeMillis(); + private final Replica localReplica; private final Optional<IMutation> mutationOpt; - public LocalMutationRunnable(Optional<IMutation> mutationOpt) + public LocalMutationRunnable(Replica localReplica, Optional<IMutation> mutationOpt) { + this.localReplica = localReplica; this.mutationOpt = mutationOpt; } - public LocalMutationRunnable() + public LocalMutationRunnable(Replica localReplica) { + this.localReplica = localReplica; this.mutationOpt = Optional.empty(); } @@ -2659,7 +2563,8 @@ public class StorageProxy implements StorageProxyMBean { if (MessagingService.DROPPABLE_VERBS.contains(verb)) MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken); - HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddressAndPort())) + + HintRunnable runnable = new HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica)) { protected void runMayThrow() throws Exception { @@ -2690,9 +2595,9 @@ public class StorageProxy implements StorageProxyMBean */ private abstract static class HintRunnable implements Runnable { - public final Collection<InetAddressAndPort> targets; + public final EndpointsForToken targets; - protected HintRunnable(Collection<InetAddressAndPort> targets) + protected HintRunnable(EndpointsForToken targets) { this.targets = targets; } @@ -2710,7 +2615,7 @@ public class StorageProxy implements StorageProxyMBean finally { StorageMetrics.totalHintsInProgress.dec(targets.size()); - for (InetAddressAndPort target : targets) + for (InetAddressAndPort target : targets.endpoints()) getHintsInProgressFor(target).decrementAndGet(); } } @@ -2756,22 +2661,23 @@ public class StorageProxy implements StorageProxyMBean } } - public static Future<Void> submitHint(Mutation mutation, InetAddressAndPort target, AbstractWriteResponseHandler<IMutation> responseHandler) + public static Future<Void> submitHint(Mutation mutation, Replica target, AbstractWriteResponseHandler<IMutation> responseHandler) { - return submitHint(mutation, Collections.singleton(target), responseHandler); + return submitHint(mutation, EndpointsForToken.of(target.range().right, target), responseHandler); } public static Future<Void> submitHint(Mutation mutation, - Collection<InetAddressAndPort> targets, + EndpointsForToken targets, AbstractWriteResponseHandler<IMutation> responseHandler) { + Replicas.assertFull(targets); // hints should not be written for transient replicas HintRunnable runnable = new HintRunnable(targets) { public void runMayThrow() { Set<InetAddressAndPort> validTargets = new HashSet<>(targets.size()); Set<UUID> hostIds = new HashSet<>(targets.size()); - for (InetAddressAndPort target : targets) + for (InetAddressAndPort target : targets.endpoints()) { UUID hostId = StorageService.instance.getHostIdForEndpoint(target); if (hostId != null) @@ -2786,7 +2692,7 @@ public class StorageProxy implements StorageProxyMBean HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis())); validTargets.forEach(HintsService.instance.metrics::incrCreatedHints); // Notify the handler only for CL == ANY - if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY) + if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY) responseHandler.response(null); } }; @@ -2797,8 +2703,8 @@ public class StorageProxy implements StorageProxyMBean private static Future<Void> submitHint(HintRunnable runnable) { StorageMetrics.totalHintsInProgress.inc(runnable.targets.size()); - for (InetAddressAndPort target : runnable.targets) - getHintsInProgressFor(target).incrementAndGet(); + for (Replica target : runnable.targets) + getHintsInProgressFor(target.endpoint()).incrementAndGet(); return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); } @@ -2892,36 +2798,6 @@ public class StorageProxy implements StorageProxyMBean DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); } - - static class PaxosParticipants - { - final List<InetAddressAndPort> liveEndpoints; - final int participants; - - PaxosParticipants(List<InetAddressAndPort> liveEndpoints, int participants) - { - this.liveEndpoints = liveEndpoints; - this.participants = participants; - } - - @Override - public final int hashCode() - { - int hashCode = 31 + (liveEndpoints == null ? 0 : liveEndpoints.hashCode()); - return 31 * hashCode * this.participants; - } - - @Override - public final boolean equals(Object o) - { - if(!(o instanceof PaxosParticipants)) - return false; - PaxosParticipants that = (PaxosParticipants)o; - // handles nulls properly - return Objects.equals(liveEndpoints, that.liveEndpoints) && participants == that.participants; - } - } - static class PaxosBallotAndContention { final UUID ballot;
--------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org