This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 67139d5c33 Raise priority of TCM internode messages during critical operations 67139d5c33 is described below commit 67139d5c334e58fdc8d9f09f9288155448666cd3 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Tue Mar 26 11:52:17 2024 +0100 Raise priority of TCM internode messages during critical operations Patch by Alex Petrov; reviewed by Sam Tunnicliffe and marcuse for CASSANDRA-19517. --- CHANGES.txt | 1 + src/java/org/apache/cassandra/net/Message.java | 21 +++++- src/java/org/apache/cassandra/net/MessageFlag.java | 4 +- .../org/apache/cassandra/net/MessagingService.java | 8 +++ .../apache/cassandra/net/OutboundConnections.java | 7 +- src/java/org/apache/cassandra/net/Verb.java | 30 ++++----- .../org/apache/cassandra/schema/Keyspaces.java | 9 +++ .../org/apache/cassandra/service/paxos/Paxos.java | 5 ++ .../cassandra/service/paxos/PaxosCommit.java | 8 ++- .../service/paxos/PaxosCommitAndPrepare.java | 4 +- .../cassandra/service/paxos/PaxosPrepare.java | 2 +- .../service/paxos/PaxosPrepareRefresh.java | 2 +- .../cassandra/service/paxos/PaxosPropose.java | 2 +- .../cassandra/service/paxos/PaxosRepair.java | 3 +- .../service/paxos/cleanup/PaxosCleanup.java | 11 ++-- .../paxos/cleanup/PaxosCleanupComplete.java | 8 ++- .../service/paxos/cleanup/PaxosCleanupRequest.java | 12 ++-- .../service/paxos/cleanup/PaxosCleanupSession.java | 6 +- .../paxos/cleanup/PaxosFinishPrepareCleanup.java | 4 +- .../paxos/cleanup/PaxosStartPrepareCleanup.java | 4 +- .../cassandra/distributed/impl/Instance.java | 13 ++++ .../test/ring/CMSUrgentMessagesTest.java | 74 ++++++++++++++++++++++ 22 files changed, 191 insertions(+), 47 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 12787c4fb3..fb1ae2f8bd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Raise priority of TCM internode messages during critical operations (CASSANDRA-19517) * Add nodetool command to unregister LEFT nodes (CASSANDRA-19581) * Add cluster metadata id to gossip syn messages (CASSANDRA-19613) * Reduce heap usage occupied by the metrics (CASSANDRA-19567) diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index b0a0b9c48d..7d7799a186 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -217,6 +217,15 @@ public class Message<T> return outWithParam(nextId(), verb, expiresAtNanos, payload, 0, null, null); } + public static <T> Message<T> out(Verb verb, T payload, boolean isUrgent) + { + assert !verb.isResponse(); + if (isUrgent) + return outWithFlag(verb, payload, MessageFlag.URGENT); + else + return out(verb, payload); + } + public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag flag) { assert !verb.isResponse(); @@ -305,7 +314,10 @@ public class Message<T> /** Builds a response Message with provided payload, and all the right fields inferred from request Message */ public <T> Message<T> responseWith(T payload) { - return outWithParam(id(), verb().responseVerb, expiresAtNanos(), payload, null, null); + Message<T> msg = outWithParam(id(), verb().responseVerb, expiresAtNanos(), payload, null, null); + if (header.hasFlag(MessageFlag.URGENT)) + msg = msg.withFlag(MessageFlag.URGENT); + return msg; } /** Builds a response Message with no payload, and all the right fields inferred from request Message */ @@ -485,6 +497,11 @@ public class Message<T> this.params = params; } + public boolean hasFlag(MessageFlag messageFlag) + { + return messageFlag.isIn(flags); + } + Header withFrom(InetAddressAndPort from) { return new Header(id, epoch, verb, from, createdAtNanos, expiresAtNanos, flags, params); @@ -936,7 +953,7 @@ public class Message<T> serializeParams(header.params, out, version); } - private Header deserializeHeader(DataInputPlus in, InetAddressAndPort peer, int version) throws IOException + public Header deserializeHeader(DataInputPlus in, InetAddressAndPort peer, int version) throws IOException { long id = in.readUnsignedVInt(); Epoch epoch = Epoch.EMPTY; diff --git a/src/java/org/apache/cassandra/net/MessageFlag.java b/src/java/org/apache/cassandra/net/MessageFlag.java index 441b06b6a3..1c2db557c3 100644 --- a/src/java/org/apache/cassandra/net/MessageFlag.java +++ b/src/java/org/apache/cassandra/net/MessageFlag.java @@ -29,7 +29,9 @@ public enum MessageFlag /** track repaired data - see CASSANDRA-14145 */ TRACK_REPAIRED_DATA (1), /** allow creating warnings or aborting queries based off query - see CASSANDRA-16850 */ - TRACK_WARNINGS(2); + TRACK_WARNINGS(2), + /** whether this message should be sent on an URGENT channel despite its Verb default priority */ + URGENT(3); private final int id; diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index d1e2f7b260..47c4830d20 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -494,6 +494,14 @@ public class MessagingService extends MessagingServiceMBeanImpl implements Messa return future; } + public void respondWithFailure(RequestFailureReason reason, Message<?> message) + { + Message<?> r = Message.failureResponse(message.id(), message.expiresAtNanos(), reason); + if (r.header.hasFlag(MessageFlag.URGENT)) + r = r.withFlag(MessageFlag.URGENT); + send(r, message.respondTo()); + } + public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection) { if (isShuttingDown) diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java b/src/java/org/apache/cassandra/net/OutboundConnections.java index aacc2b4473..80c66e0595 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnections.java +++ b/src/java/org/apache/cassandra/net/OutboundConnections.java @@ -229,9 +229,10 @@ public class OutboundConnections return LARGE_MESSAGES; } - return msg.verb().priority == Verb.Priority.P0 - ? URGENT_MESSAGES - : SMALL_MESSAGES; + if (msg.verb().priority == Verb.Priority.P0 || msg.header.hasFlag(MessageFlag.URGENT)) + return URGENT_MESSAGES; + else + return SMALL_MESSAGES; } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index a307185ecc..17c4550fc0 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -224,21 +224,21 @@ public enum Verb PAXOS2_CLEANUP_COMPLETE_REQ (48, P2, repairTimeout, PAXOS_REPAIR, () -> PaxosCleanupComplete.serializer, () -> PaxosCleanupComplete.verbHandler, PAXOS2_CLEANUP_COMPLETE_RSP ), // transactional cluster metadata - TCM_COMMIT_RSP (801, P1, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, () -> ResponseVerbHandler.instance ), - TCM_COMMIT_REQ (802, P1, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), - TCM_FETCH_CMS_LOG_RSP (803, P1, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), - TCM_FETCH_CMS_LOG_REQ (804, P1, rpcTimeout, FETCH_LOG, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), - TCM_REPLICATION (805, P1, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> replicationHandler() ), - TCM_NOTIFY_RSP (806, P1, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> ResponseVerbHandler.instance ), - TCM_NOTIFY_REQ (807, P1, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ), - TCM_CURRENT_EPOCH_REQ (808, P1, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ), - TCM_INIT_MIG_RSP (809, P1, rpcTimeout, INTERNAL_METADATA, MessageSerializers::metadataHolderSerializer, () -> ResponseVerbHandler.instance ), - TCM_INIT_MIG_REQ (810, P1, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ), - TCM_ABORT_MIG (811, P1, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), - TCM_DISCOVER_RSP (812, P1, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ), - TCM_DISCOVER_REQ (813, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), - TCM_FETCH_PEER_LOG_RSP (818, P1, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), - TCM_FETCH_PEER_LOG_REQ (819, P1, rpcTimeout, FETCH_LOG, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), + TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, () -> ResponseVerbHandler.instance ), + TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), + TCM_FETCH_CMS_LOG_RSP (803, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), + TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_LOG, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), + TCM_REPLICATION (805, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> replicationHandler() ), + TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> ResponseVerbHandler.instance ), + TCM_NOTIFY_REQ (807, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ), + TCM_CURRENT_EPOCH_REQ (808, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ), + TCM_INIT_MIG_RSP (809, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::metadataHolderSerializer, () -> ResponseVerbHandler.instance ), + TCM_INIT_MIG_REQ (810, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ), + TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), + TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ), + TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), + TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), + TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_LOG, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ), diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java b/src/java/org/apache/cassandra/schema/Keyspaces.java index 3f5431da1a..151d5a5db0 100644 --- a/src/java/org/apache/cassandra/schema/Keyspaces.java +++ b/src/java/org/apache/cassandra/schema/Keyspaces.java @@ -121,6 +121,15 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> return tables.get(id); } + public KeyspaceMetadata getContainingKeyspaceMetadata(TableId tableId) + { + TableMetadata tableMetadata = getTableOrViewNullable(tableId); + if (tableMetadata == null) + throw new IllegalStateException("Can't find table " + tableId); + + return keyspaces.get(tableMetadata.keyspace); + } + public boolean isEmpty() { return keyspaces.isEmpty(); diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index fed3cc0785..1a76cfd972 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -535,6 +535,11 @@ public class Paxos { throw new UnsupportedOperationException(); } + + public boolean isUrgent() + { + return keyspace.getMetadata().params.replication.isMeta(); + } } /** diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java b/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java index 1821ec4004..943b04c30c 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java @@ -175,9 +175,11 @@ public class PaxosCommit<OnDone extends Consumer<? super PaxosCommit.Status>> ex void start(Participants participants, boolean async) { boolean executeOnSelf = false; - Message<Agreed> commitMessage = Message.out(PAXOS_COMMIT_REQ, commit); - Message<Mutation> mutationMessage = ENABLE_DC_LOCAL_COMMIT && consistencyForConsensus.isDatacenterLocal() - ? Message.out(PAXOS2_COMMIT_REMOTE_REQ, commit.makeMutation()) : null; + Message<Agreed> commitMessage = Message.out(PAXOS_COMMIT_REQ, commit, participants.isUrgent()); + + Message<Mutation> mutationMessage = null; + if (ENABLE_DC_LOCAL_COMMIT && consistencyForConsensus.isDatacenterLocal()) + mutationMessage = Message.out(PAXOS2_COMMIT_REMOTE_REQ, commit.makeMutation(), participants.isUrgent()); for (int i = 0, mi = participants.allLive.size(); i < mi ; ++i) executeOnSelf |= isSelfOrSend(commitMessage, mutationMessage, participants.allLive.endpoint(i)); diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java b/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java index 7a9dba7b5a..7046dfbb37 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java @@ -49,8 +49,8 @@ public class PaxosCommitAndPrepare PaxosPrepare prepare = new PaxosPrepare(participants, request, acceptEarlyReadSuccess, null); Tracing.trace("Committing {}; Preparing {}", commit.ballot, ballot); - Message<Request> message = Message.out(PAXOS2_COMMIT_AND_PREPARE_REQ, request); -// .permitsArtificialDelay(participants.consistencyForConsensus); + Message<Request> message = Message.out(PAXOS2_COMMIT_AND_PREPARE_REQ, request, participants.isUrgent()); + start(prepare, participants, message, RequestHandler::execute); return prepare; } diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java index 7d0c114299..78f8a7a903 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java @@ -358,7 +358,7 @@ public class PaxosPrepare extends PaxosRequestCallback<PaxosPrepare.Response> im private static PaxosPrepare prepareWithBallotInternal(Participants participants, Request request, boolean acceptEarlyReadPermission, Consumer<Status> onDone) { PaxosPrepare prepare = new PaxosPrepare(participants, request, acceptEarlyReadPermission, onDone); - Message<Request> message = Message.out(PAXOS2_PREPARE_REQ, request); + Message<Request> message = Message.out(PAXOS2_PREPARE_REQ, request, participants.isUrgent()); start(prepare, participants, message, RequestHandler::execute); return prepare; } diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java index 19aff7428a..925daaf9dd 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java @@ -75,7 +75,7 @@ public class PaxosPrepareRefresh implements RequestCallbackWithFailure<PaxosPrep public PaxosPrepareRefresh(Ballot prepared, Paxos.Participants participants, Committed latestCommitted, Callbacks callbacks) { this.callbacks = callbacks; - this.send = Message.out(PAXOS2_PREPARE_REFRESH_REQ, new Request(prepared, latestCommitted)); + this.send = Message.out(PAXOS2_PREPARE_REFRESH_REQ, new Request(prepared, latestCommitted), participants.isUrgent()); } void refresh(List<InetAddressAndPort> refresh) diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java b/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java index 57d3459f40..db702af7d4 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java @@ -207,7 +207,7 @@ public class PaxosPropose<OnDone extends Consumer<? super PaxosPropose.Status>> void start(Paxos.Participants participants) { - Message<Request> message = Message.out(PAXOS2_PROPOSE_REQ, new Request(proposal)); + Message<Request> message = Message.out(PAXOS2_PROPOSE_REQ, new Request(proposal), participants.isUrgent()); boolean executeOnSelf = false; for (int i = 0, size = participants.sizeOfPoll(); i < size ; ++i) diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java index cc6bc72325..0e3a732185 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java @@ -288,7 +288,8 @@ public class PaxosRepair extends AbstractPaxosRepair public void run() { - Message<Request> message = Message.out(PAXOS2_REPAIR_REQ, new Request(partitionKey(), table)); + Message<Request> message = Message.out(PAXOS2_REPAIR_REQ, new Request(partitionKey(), table), participants.isUrgent()); + for (int i = 0, size = participants.sizeOfPoll(); i < size ; ++i) MessagingService.instance().sendWithCallback(message, participants.voter(i), this); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java index feaa64bd1e..331adef64f 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java @@ -33,6 +33,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -55,6 +56,7 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable private final Collection<Range<Token>> ranges; private final boolean skippedReplicas; private final Executor executor; + private final boolean isUrgent; // references kept for debugging private PaxosStartPrepareCleanup startPrepare; @@ -70,6 +72,7 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable this.ranges = ranges; this.skippedReplicas = skippedReplicas; this.executor = executor; + this.isUrgent = Keyspace.open(table.keyspace).getMetadata().params.replication.isMeta(); } private <T> void addCallback(Future<T> future, Consumer<T> onComplete) @@ -87,28 +90,28 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable public void run() { EndpointState localEpState = ctx.gossiper().getEndpointStateForEndpoint(ctx.broadcastAddressAndPort()); - startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, endpoints, localEpState, ranges); + startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, endpoints, localEpState, ranges, isUrgent); addCallback(startPrepare, this::finishPrepare); } private void finishPrepare(PaxosCleanupHistory result) { ctx.nonPeriodicTasks().schedule(() -> { - finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, result); + finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, isUrgent, result); addCallback(finishPrepare, (v) -> startSession(result.highBound)); }, Math.min(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS)), MILLISECONDS); } private void startSession(Ballot lowBound) { - session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges); + session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges, isUrgent); addCallback(session, (v) -> finish(lowBound)); executor.execute(session); } private void finish(Ballot lowBound) { - complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, lowBound, skippedReplicas); + complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, lowBound, skippedReplicas, isUrgent); addCallback(complete, this::trySuccess); executor.execute(complete); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java index 8742af84e0..c0c9a7e1d4 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java @@ -52,8 +52,9 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa final Ballot lowBound; final boolean skippedReplicas; private final SharedContext ctx; + private final boolean isUrgent; - PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, boolean skippedReplicas) + PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, boolean skippedReplicas, boolean isUrgent) { this.ctx = ctx; this.waitingResponse = new HashSet<>(endpoints); @@ -61,13 +62,16 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa this.ranges = ranges; this.lowBound = lowBound; this.skippedReplicas = skippedReplicas; + this.isUrgent = isUrgent; } public synchronized void run() { Request request = !skippedReplicas ? new Request(tableId, lowBound, ranges) : new Request(tableId, Ballot.none(), Collections.emptyList()); - Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, request); + + Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, request, isUrgent); + for (InetAddressAndPort endpoint : waitingResponse) ctx.messaging().sendWithCallback(message, endpoint, this); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java index 2eaeaf2537..6d3fb731ea 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java @@ -38,6 +38,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.tcm.ClusterMetadata; @@ -75,6 +76,7 @@ public class PaxosCleanupRequest return in -> { PaxosCleanupRequest request = in.payload; + boolean isUrgent = in.header.hasFlag(MessageFlag.URGENT); if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, request.tableId)) { // Try catching up, in case it's us @@ -82,24 +84,24 @@ public class PaxosCleanupRequest String msg = String.format("Rejecting cleanup request %s from %s. Some ranges are not replicated (%s)", request.session, in.from(), request.ranges); - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg)); + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg), isUrgent); ctx.messaging().send(response, in.respondTo()); return; } PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.create(ctx, request); - coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>() + coordinator.addCallback(new FutureCallback<>() { public void onSuccess(@Nullable PaxosCleanupResponse finished) { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow()); + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow(), isUrgent); ctx.messaging().send(response, in.respondTo()); } public void onFailure(Throwable throwable) { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage())); + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage()), isUrgent); ctx.messaging().send(response, in.respondTo()); } }); @@ -112,7 +114,7 @@ public class PaxosCleanupRequest } public static final IVerbHandler<PaxosCleanupRequest> verbHandler = createVerbHandler(SharedContext.Global.instance); - public static final IVersionedSerializer<PaxosCleanupRequest> serializer = new IVersionedSerializer<PaxosCleanupRequest>() + public static final IVersionedSerializer<PaxosCleanupRequest> serializer = new IVersionedSerializer<>() { public void serialize(PaxosCleanupRequest completer, DataOutputPlus out, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java index 681f67a56c..80f571cd26 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java @@ -98,15 +98,17 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, private final TableId tableId; private final Collection<Range<Token>> ranges; private final Queue<InetAddressAndPort> pendingCleanups = new ConcurrentLinkedQueue<>(); + private final boolean isUrgent; private InetAddressAndPort inProgress = null; private volatile long lastMessageSentNanos; private ScheduledFuture<?> timeout; - PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges) + PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges, boolean isUrgent) { this.ctx = ctx; this.tableId = tableId; this.ranges = ranges; + this.isUrgent = isUrgent; pendingCleanups.addAll(endpoints); lastMessageSentNanos = ctx.clock().nanoTime(); @@ -125,7 +127,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, { lastMessageSentNanos = ctx.clock().nanoTime(); PaxosCleanupRequest completer = new PaxosCleanupRequest(session, tableId, ranges); - Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, completer); + Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, completer, isUrgent); ctx.messaging().sendWithCallback(msg, endpoint, this); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java index b3104788d2..07b1bbe334 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java @@ -38,12 +38,12 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ this.waitingResponse = new HashSet<>(endpoints); } - public static PaxosFinishPrepareCleanup finish(SharedContext ctx, Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result) + public static PaxosFinishPrepareCleanup finish(SharedContext ctx, Collection<InetAddressAndPort> endpoints, boolean isUrgent, PaxosCleanupHistory result) { PaxosFinishPrepareCleanup callback = new PaxosFinishPrepareCleanup(endpoints); synchronized (callback) { - Message<PaxosCleanupHistory> message = Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result); + Message<PaxosCleanupHistory> message = Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result, isUrgent); for (InetAddressAndPort endpoint : endpoints) ctx.messaging().sendWithCallback(message, endpoint, callback); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java index 8f631bcbed..41735526a4 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java @@ -77,12 +77,12 @@ public class PaxosStartPrepareCleanup extends AsyncFuture<PaxosCleanupHistory> i * prepare message to prevent racing with gossip dissemination and guarantee that every repair participant is aware * of the pending ring change during repair. */ - public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges) + public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges, boolean isUrgent) { PaxosStartPrepareCleanup callback = new PaxosStartPrepareCleanup(tableId, endpoints); synchronized (callback) { - Message<Request> message = Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, localEpState, ranges)); + Message<Request> message = Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, localEpState, ranges), isUrgent); for (InetAddressAndPort endpoint : endpoints) ctx.messaging().sendWithCallback(message, endpoint, callback); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index a2b225ca0b..e53f7c724a 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -486,6 +486,19 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } } + @VisibleForTesting + public static Message.Header deserializeHeader(IMessage message) + { + try (DataInputBuffer in = new DataInputBuffer(message.bytes())) + { + return Message.serializer.deserializeHeader(in, toCassandraInetAddressAndPort(message.from()), message.version()); + } + catch (Throwable t) + { + throw new RuntimeException("Can not deserialize heaader " + message, t); + } + } + @Override public void receiveMessage(IMessage message) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java new file mode 100644 index 0000000000..8a718cf013 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.impl.Instance; +import org.apache.cassandra.distributed.test.log.FuzzTestBase; +import org.apache.cassandra.net.MessageFlag; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.tcm.sequences.AddToCMS; + +public class CMSUrgentMessagesTest extends FuzzTestBase +{ + @Test + public void allPaxosMessagesAreUrgentTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + List<Throwable> thrown = new CopyOnWriteArrayList<>(); + cluster.filters() + .allVerbs() + .messagesMatching((from, to, msg) -> { + Verb verb = Verb.fromId(msg.verb()); + if (!verb.toString().contains("PAXOS2")) + return false; + + try + { + boolean hasFlag = cluster.get(1).callOnInstance(() -> Instance.deserializeHeader(msg).hasFlag(MessageFlag.URGENT)); + assert hasFlag : String.format("%s does not have URGENT flag set: %s", verb, msg); + } + catch (Throwable t) + { + thrown.add(t); + } + return false; + }) + .drop(); + + for (int idx : new int[]{ 2, 3 }) + cluster.get(idx).runOnInstance(() -> AddToCMS.initiate()); + + if (!thrown.isEmpty()) + { + Throwable t = new AssertionError("Caught exceptions"); + for (Throwable throwable : thrown) + t.addSuppressed(throwable); + throw t; + } + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org