iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2673713418


##########
src/java/org/apache/cassandra/replication/ForwardedWrite.java:
##########
@@ -241,6 +242,119 @@ public static AbstractWriteResponseHandler<Object> 
forwardMutation(Mutation muta
         return handler;
     }
 
+    /**
+     * Forward a tracked counter mutation to a replica leader for processing.
+     * The leader will apply the counter mutation, assign a mutation ID, and 
replicate to other replicas.
+     */
+    public static AbstractWriteResponseHandler<Object> 
forwardCounterMutation(CounterMutation counterMutation,
+                                                                               
ReplicaPlan.ForWrite plan,
+                                                                               
AbstractReplicationStrategy strategy,
+                                                                               
Dispatcher.RequestTime requestTime)
+    {
+        Preconditions.checkArgument(counterMutation.id().isNone(), 
"CounterMutation should not have an ID when forwarding");
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
+
+        // Find the leader replica - prefer local DC replicas for counters
+        Replica leader;
+        try
+        {
+            leader = ReplicaPlans.findCounterLeaderReplica(cm, 
counterMutation.getKeyspaceName(),
+                                                          
counterMutation.key(), localDataCenter,
+                                                          
counterMutation.consistency());
+        }
+        catch (Exception e)
+        {
+            logger.error("Failed to find counter leader replica for tracked 
write", e);
+            throw e;
+        }
+
+        Preconditions.checkState(!leader.isSelf(), "Leader should not be self 
when forwarding counter mutation");
+        logger.trace("Forwarding tracked counter mutation to leader replica 
{}", leader);
+
+        // Create response handler for all replicas
+        AbstractWriteResponseHandler<Object> handler = 
strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, 
requestTime);
+
+        // Add callbacks for all live replicas to respond directly to 
coordinator
+        Message<CounterMutation> forwardMessage = 
Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation, 
requestTime);
+
+        for (Replica replica : plan.contacts())
+        {
+            if (plan.isAlive(replica))
+            {
+                logger.trace("Adding forwarding callback for tracked counter 
response from {} id {}", replica, forwardMessage.id());
+                
MessagingService.instance().callbacks.addWithExpiration(handler, 
forwardMessage, replica);
+            }
+            else
+            {
+                handler.expired();
+            }
+        }
+
+        // Send the counter mutation to the leader
+        MessagingService.instance().send(forwardMessage, leader.endpoint());
+
+        return handler;
+    }
+
+    /**
+     * Apply a forwarded tracked counter mutation on the leader replica.
+     * Called by CounterMutationVerbHandler when receiving a forwarded counter 
write.
+     *
+     * This method:
+     * 1. Creates CoordinatorAckInfo from the incoming message
+     * 2. Applies counter mutation locally with generated mutation ID
+     * 3. Forwards result (Mutation not CounterMutation) to other replicas 
with CoordinatorAckInfo
+     * 4. Sends leader's response back to coordinator
+     *
+     * @param counterMutation the counter mutation to apply
+     * @param message the original message (contains coordinator address and 
message ID)
+     * @param respondToAddress the address to send the response to 
(coordinator)
+     */
+    public static void applyForwardedCounterMutation(CounterMutation 
counterMutation,
+                                                      Message<CounterMutation> 
message,
+                                                      InetAddressAndPort 
respondToAddress)
+    {
+        try
+        {
+            CoordinatorAckInfo coordinatorAckInfo = 
CoordinatorAckInfo.toCoordinator(message.from(), message.id());
+
+            String keyspaceName = counterMutation.getKeyspaceName();
+            Token token = counterMutation.key().getToken();
+            Keyspace ks = Keyspace.open(keyspaceName);
+            ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks, 
counterMutation.consistency(), token, ReplicaPlans.writeAll);
+            AbstractReplicationStrategy rs = plan.replicationStrategy();
+
+            MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+
+            if (logger.isTraceEnabled())
+                logger.trace("Forwarded counter mutation {}: applying locally 
with ID and forwarding to other replicas", id);
+
+            TrackedWriteResponseHandler handler =
+                TrackedWriteResponseHandler.wrap(
+                    rs.getWriteResponseHandler(plan, null, WriteType.COUNTER, 
null, Dispatcher.RequestTime.forImmediateExecution()),
+                    id
+                );
+
+            // Apply counter mutation with ID to get result
+            Mutation result = counterMutation.applyCounterMutation(id);
+
+            // Send result to other replicas with CoordinatorAckInfo
+            // They will respond to the coordinator, not to this leader
+            TrackedWriteRequest.sendToReplicasOnly(result, plan, handler, 
coordinatorAckInfo);
+
+            // Send this leader's response back to coordinator
+            MessagingService.instance().send(message.emptyResponse(), 
respondToAddress);

Review Comment:
   I think this code is missing setting up a callback on the leader. If you 
look at `MutationVerbHandler#respond()` you'll notice that the replicas respond 
to the coordinator (so the client can be notified) *and* to the leader (so that 
the leader can mark the id as witnessed on that replica pro-actively). For 
regular mutations we seem to be using `LeaderCallback` class for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to