iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2673719160
##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -95,38 +97,73 @@ public static AbstractWriteResponseHandler<?> perform(
if (logger.isTraceEnabled())
logger.trace("Remote tracked request {} {}", mutation, plan);
writeMetrics.remoteRequests.mark();
- return ForwardedWrite.forwardMutation(mutation, plan, rs,
requestTime);
+
+ if (mutation instanceof CounterMutation)
+ return ForwardedWrite.forwardCounterMutation((CounterMutation)
mutation, plan, rs, requestTime);
+ else
+ return ForwardedWrite.forwardMutation((Mutation) mutation,
plan, rs, requestTime);
}
if (logger.isTraceEnabled())
logger.trace("Local tracked request {} {}", mutation, plan);
writeMetrics.localRequests.mark();
+
MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
- mutation = mutation.withMutationId(id);
+ final TrackedWriteResponseHandler handler;
if (logger.isTraceEnabled())
logger.trace("Write replication plan for mutation {}: live={},
pending={}, all={}",
- id, plan.live(), plan.pending(), plan.contacts());
+ id, plan.live(), plan.pending(), plan.contacts());
- TrackedWriteResponseHandler handler =
- TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan,
null, WriteType.SIMPLE, null, requestTime), id);
- applyLocallyAndSendToReplicas(mutation, plan, handler);
+ if (mutation instanceof CounterMutation)
+ {
+ handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.COUNTER, null, requestTime), id);
+ applyCounterMutationLocally((CounterMutation) mutation, id, plan,
handler);
+ }
+ else
+ {
+ mutation = mutation.withMutationId(id);
+ handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.SIMPLE, null, requestTime), id);
+ applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler);
+ }
return handler;
}
public static void applyLocallyAndSendToReplicas(Mutation mutation,
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
{
- String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
+ applyMutationLocally(mutation, handler);
+ sendToReplicasInternal(mutation, plan, handler, null, false);
+ }
- boolean applyLocally = false;
+ public static void sendToReplicasOnly(Mutation mutation,
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler,
ForwardedWrite.CoordinatorAckInfo coordinatorAckInfo)
+ {
+ sendToReplicasInternal(mutation, plan, handler, coordinatorAckInfo,
true);
+ }
+
+ /**
+ * Internal method to send a mutation to all replicas.
+ * Handles grouping replicas by DC, sending messages, and tracking remote
replicas.
+ *
+ * @param mutation the mutation with assigned ID to send to replicas
+ * @param plan the replica plan
+ * @param handler the response handler
+ * @param coordinatorAckInfo optional coordinator info for forwarded
writes (null for local coordinator)
+ * @param notifyHandlerForLocal if true, notify handler for local replica
(used when mutation already applied)
+ */
+ private static void sendToReplicasInternal(Mutation mutation,
ReplicaPlan.ForWrite plan,
Review Comment:
I believe it should be possible to reuse this method for
`ForwardedWrite#applyLocallyAndForwardToReplicas()` now as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]