iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2668841441


##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -295,6 +438,68 @@ public String description()
         }
     }
 
+    private static class LocalCounterMutationRunnable implements 
DebuggableTask.RunnableDebuggableTask
+    {
+        private final CounterMutation counterMutation;
+        private final MutationId mutationId;
+        private final ReplicaPlan.ForWrite plan;
+        private final TrackedWriteResponseHandler handler;
+
+        LocalCounterMutationRunnable(CounterMutation counterMutation, 
MutationId mutationId, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler 
handler)
+        {
+            this.counterMutation = counterMutation;
+            this.mutationId = mutationId;
+            this.plan = plan;
+            this.handler = handler;
+        }
+
+        private Dispatcher.RequestTime getReqestTime()
+        {
+            return handler.getRequestTime();
+        }
+
+        @Override
+        public void run()
+        {
+            long now = MonotonicClock.Global.approxTime.now();
+            long deadline = 
getReqestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());

Review Comment:
   `MUTATION_REQ` should be `COUNTER_MUTATION_REQ`.



##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -95,23 +96,44 @@ public static AbstractWriteResponseHandler<?> perform(
             if (logger.isTraceEnabled())
                 logger.trace("Remote tracked request {} {}", mutation, plan);
             writeMetrics.remoteRequests.mark();
-            return ForwardedWrite.forwardMutation(mutation, plan, rs, 
requestTime);
+
+            if (mutation instanceof CounterMutation)
+                return ForwardedWrite.forwardCounterMutation((CounterMutation) 
mutation, plan, rs, requestTime);
+            else
+                return ForwardedWrite.forwardMutation((Mutation) mutation, 
plan, rs, requestTime);
         }
 
         if (logger.isTraceEnabled())
             logger.trace("Local tracked request {} {}", mutation, plan);
         writeMetrics.localRequests.mark();
+
         MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
-        mutation = mutation.withMutationId(id);
 
-        if (logger.isTraceEnabled())
-            logger.trace("Write replication plan for mutation {}: live={}, 
pending={}, all={}",
-                         id, plan.live(), plan.pending(), plan.contacts());
+        if (mutation instanceof CounterMutation)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Write replication plan for counter mutation {}: 
live={}, pending={}, all={}",
+                             id, plan.live(), plan.pending(), plan.contacts());
+
+            TrackedWriteResponseHandler handler =
+                
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, 
WriteType.COUNTER, null, requestTime), id);
 
-        TrackedWriteResponseHandler handler =
+            applyCounterMutationLocally((CounterMutation) mutation, id, plan, 
handler);
+            return handler;
+        }
+        else
+        {
+            mutation = mutation.withMutationId(id);
+
+            if (logger.isTraceEnabled())
+                logger.trace("Write replication plan for mutation {}: live={}, 
pending={}, all={}",
+                             id, plan.live(), plan.pending(), plan.contacts());
+
+            TrackedWriteResponseHandler handler =
             TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, 
null, WriteType.SIMPLE, null, requestTime), id);
-        applyLocallyAndSendToReplicas(mutation, plan, handler);
-        return handler;
+            applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler);
+            return handler;

Review Comment:
   Likewise, maybe pull out the `return handler` line out of the if-else 
statement? 



##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -295,6 +438,68 @@ public String description()
         }
     }
 
+    private static class LocalCounterMutationRunnable implements 
DebuggableTask.RunnableDebuggableTask
+    {
+        private final CounterMutation counterMutation;
+        private final MutationId mutationId;
+        private final ReplicaPlan.ForWrite plan;
+        private final TrackedWriteResponseHandler handler;
+
+        LocalCounterMutationRunnable(CounterMutation counterMutation, 
MutationId mutationId, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler 
handler)
+        {
+            this.counterMutation = counterMutation;
+            this.mutationId = mutationId;
+            this.plan = plan;
+            this.handler = handler;
+        }
+
+        private Dispatcher.RequestTime getReqestTime()
+        {
+            return handler.getRequestTime();
+        }
+
+        @Override
+        public void run()
+        {
+            long now = MonotonicClock.Global.approxTime.now();
+            long deadline = 
getReqestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
+
+            if (now > deadline)
+            {
+                long timeTakenNanos = now - startTimeNanos();
+                
MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.COUNTER_MUTATION_REQ,
 timeTakenNanos, NANOSECONDS);
+                return;
+            }
+
+            try
+            {
+                Mutation result = 
counterMutation.applyCounterMutation((mutationId));
+                sendToReplicasOnly(result, plan, handler, null);
+            }
+            catch (Exception ex)
+            {
+                if(!(ex instanceof WriteTimeoutException))
+                    logger.error("Failed to apply counter mutation locally:  
", ex);
+                handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailure.forException(ex));
+            }
+        }
+
+        @Override
+        public long creationTimeNanos() {

Review Comment:
   Bracket placement not on new line here and in a few other spots in the PR. 
If using IDEA make sure to run `ant generate-idea-files` to set up correct 
formatting rules for you easily.



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1366,7 +1364,10 @@ public static void 
dispatchMutationsWithRetryOnDifferentSystem(List<? extends IM
                 {
                     for (IMutation trackedMutation : trackedMutations)
                     {
-                        
trackedHandlers.add(TrackedWriteRequest.perform((Mutation) trackedMutation, 
consistencyLevel, requestTime));
+                        if (trackedMutation instanceof CounterMutation)

Review Comment:
   This check, branch, an the casts in the calls to `perform()` aren't 
necessary - `perform()` accepts an `IMutation` already.



##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -221,12 +243,133 @@ public static void 
applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan.
         }
     }
 
+    /**
+     * Send a mutation to remote replicas only, without applying it locally.
+     * This is used for counter mutations where the mutation has already been 
applied locally
+     * by applyCounterMutation() before assigning the mutation ID.
+     *
+     * @param mutation the mutation with assigned ID to send to replicas
+     * @param plan the replica plan
+     * @param handler the response handler
+     * @param coordinatorAckInfo optional coordinator info for forwarded 
writes (null for local coordinator)
+     */
+    public static void sendToReplicasOnly(Mutation mutation, 
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler, 
ForwardedWrite.CoordinatorAckInfo coordinatorAckInfo)

Review Comment:
   You should be able to refactor `applyLocallyAndSendToReplicas()` to use 
`sendToReplicasOnly()`, with a few modifications - instead of duplicating the 
very much same logic in both these methods.



##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -95,23 +96,44 @@ public static AbstractWriteResponseHandler<?> perform(
             if (logger.isTraceEnabled())
                 logger.trace("Remote tracked request {} {}", mutation, plan);
             writeMetrics.remoteRequests.mark();
-            return ForwardedWrite.forwardMutation(mutation, plan, rs, 
requestTime);
+
+            if (mutation instanceof CounterMutation)
+                return ForwardedWrite.forwardCounterMutation((CounterMutation) 
mutation, plan, rs, requestTime);
+            else
+                return ForwardedWrite.forwardMutation((Mutation) mutation, 
plan, rs, requestTime);
         }
 
         if (logger.isTraceEnabled())
             logger.trace("Local tracked request {} {}", mutation, plan);
         writeMetrics.localRequests.mark();
+
         MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
-        mutation = mutation.withMutationId(id);
 
-        if (logger.isTraceEnabled())
-            logger.trace("Write replication plan for mutation {}: live={}, 
pending={}, all={}",
-                         id, plan.live(), plan.pending(), plan.contacts());
+        if (mutation instanceof CounterMutation)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Write replication plan for counter mutation {}: 
live={}, pending={}, all={}",
+                             id, plan.live(), plan.pending(), plan.contacts());

Review Comment:
   It's okay to share the same generic tracing message here between the two 
branches and not duplicate it with the slightly different wording.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to