iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2673713418
##########
src/java/org/apache/cassandra/replication/ForwardedWrite.java:
##########
@@ -241,6 +242,119 @@ public static AbstractWriteResponseHandler<Object>
forwardMutation(Mutation muta
return handler;
}
+ /**
+ * Forward a tracked counter mutation to a replica leader for processing.
+ * The leader will apply the counter mutation, assign a mutation ID, and
replicate to other replicas.
+ */
+ public static AbstractWriteResponseHandler<Object>
forwardCounterMutation(CounterMutation counterMutation,
+
ReplicaPlan.ForWrite plan,
+
AbstractReplicationStrategy strategy,
+
Dispatcher.RequestTime requestTime)
+ {
+ Preconditions.checkArgument(counterMutation.id().isNone(),
"CounterMutation should not have an ID when forwarding");
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
+
+ // Find the leader replica - prefer local DC replicas for counters
+ Replica leader;
+ try
+ {
+ leader = ReplicaPlans.findCounterLeaderReplica(cm,
counterMutation.getKeyspaceName(),
+
counterMutation.key(), localDataCenter,
+
counterMutation.consistency());
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to find counter leader replica for tracked
write", e);
+ throw e;
+ }
+
+ Preconditions.checkState(!leader.isSelf(), "Leader should not be self
when forwarding counter mutation");
+ logger.trace("Forwarding tracked counter mutation to leader replica
{}", leader);
+
+ // Create response handler for all replicas
+ AbstractWriteResponseHandler<Object> handler =
strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null,
requestTime);
+
+ // Add callbacks for all live replicas to respond directly to
coordinator
+ Message<CounterMutation> forwardMessage =
Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation,
requestTime);
+
+ for (Replica replica : plan.contacts())
+ {
+ if (plan.isAlive(replica))
+ {
+ logger.trace("Adding forwarding callback for tracked counter
response from {} id {}", replica, forwardMessage.id());
+
MessagingService.instance().callbacks.addWithExpiration(handler,
forwardMessage, replica);
+ }
+ else
+ {
+ handler.expired();
+ }
+ }
+
+ // Send the counter mutation to the leader
+ MessagingService.instance().send(forwardMessage, leader.endpoint());
+
+ return handler;
+ }
+
+ /**
+ * Apply a forwarded tracked counter mutation on the leader replica.
+ * Called by CounterMutationVerbHandler when receiving a forwarded counter
write.
+ *
+ * This method:
+ * 1. Creates CoordinatorAckInfo from the incoming message
+ * 2. Applies counter mutation locally with generated mutation ID
+ * 3. Forwards result (Mutation not CounterMutation) to other replicas
with CoordinatorAckInfo
+ * 4. Sends leader's response back to coordinator
+ *
+ * @param counterMutation the counter mutation to apply
+ * @param message the original message (contains coordinator address and
message ID)
+ * @param respondToAddress the address to send the response to
(coordinator)
+ */
+ public static void applyForwardedCounterMutation(CounterMutation
counterMutation,
+ Message<CounterMutation>
message,
+ InetAddressAndPort
respondToAddress)
+ {
+ try
+ {
+ CoordinatorAckInfo coordinatorAckInfo =
CoordinatorAckInfo.toCoordinator(message.from(), message.id());
+
+ String keyspaceName = counterMutation.getKeyspaceName();
+ Token token = counterMutation.key().getToken();
+ Keyspace ks = Keyspace.open(keyspaceName);
+ ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks,
counterMutation.consistency(), token, ReplicaPlans.writeAll);
+ AbstractReplicationStrategy rs = plan.replicationStrategy();
+
+ MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+
+ if (logger.isTraceEnabled())
+ logger.trace("Forwarded counter mutation {}: applying locally
with ID and forwarding to other replicas", id);
+
+ TrackedWriteResponseHandler handler =
+ TrackedWriteResponseHandler.wrap(
+ rs.getWriteResponseHandler(plan, null, WriteType.COUNTER,
null, Dispatcher.RequestTime.forImmediateExecution()),
+ id
+ );
+
+ // Apply counter mutation with ID to get result
+ Mutation result = counterMutation.applyCounterMutation(id);
+
+ // Send result to other replicas with CoordinatorAckInfo
+ // They will respond to the coordinator, not to this leader
+ TrackedWriteRequest.sendToReplicasOnly(result, plan, handler,
coordinatorAckInfo);
+
+ // Send this leader's response back to coordinator
+ MessagingService.instance().send(message.emptyResponse(),
respondToAddress);
Review Comment:
I think this code is missing setting up a callback on the leader. If you
look at `MutationVerbHandler#respond()` you'll notice that the replicas respond
to the coordinator (so the client can be notified) *and* to the leader (so that
the leader can mark the id as witnessed on that replica pro-actively). For
regular mutations we seem to be using `LeaderCallback` class for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]