iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2673117135
##########
src/java/org/apache/cassandra/replication/ForwardedWrite.java:
##########
@@ -241,6 +242,119 @@ public static AbstractWriteResponseHandler<Object>
forwardMutation(Mutation muta
return handler;
}
+ /**
+ * Forward a tracked counter mutation to a replica leader for processing.
+ * The leader will apply the counter mutation, assign a mutation ID, and
replicate to other replicas.
+ */
+ public static AbstractWriteResponseHandler<Object>
forwardCounterMutation(CounterMutation counterMutation,
+
ReplicaPlan.ForWrite plan,
+
AbstractReplicationStrategy strategy,
+
Dispatcher.RequestTime requestTime)
+ {
+ Preconditions.checkArgument(counterMutation.id().isNone(),
"CounterMutation should not have an ID when forwarding");
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
+
+ // Find the leader replica - prefer local DC replicas for counters
+ Replica leader;
+ try
+ {
+ leader = ReplicaPlans.findCounterLeaderReplica(cm,
counterMutation.getKeyspaceName(),
+
counterMutation.key(), localDataCenter,
+
counterMutation.consistency());
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to find counter leader replica for tracked
write", e);
+ throw e;
+ }
+
+ Preconditions.checkState(!leader.isSelf(), "Leader should not be self
when forwarding counter mutation");
+ logger.trace("Forwarding tracked counter mutation to leader replica
{}", leader);
+
+ // Create response handler for all replicas
+ AbstractWriteResponseHandler<Object> handler =
strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null,
requestTime);
+
+ // Add callbacks for all live replicas to respond directly to
coordinator
+ Message<CounterMutation> forwardMessage =
Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation,
requestTime);
+
+ for (Replica replica : plan.contacts())
+ {
+ if (plan.isAlive(replica))
+ {
+ logger.trace("Adding forwarding callback for tracked counter
response from {} id {}", replica, forwardMessage.id());
+
MessagingService.instance().callbacks.addWithExpiration(handler,
forwardMessage, replica);
+ }
+ else
+ {
+ handler.expired();
+ }
+ }
+
+ // Send the counter mutation to the leader
+ MessagingService.instance().send(forwardMessage, leader.endpoint());
+
+ return handler;
+ }
+
+ /**
+ * Apply a forwarded tracked counter mutation on the leader replica.
+ * Called by CounterMutationVerbHandler when receiving a forwarded counter
write.
+ *
+ * This method:
+ * 1. Creates CoordinatorAckInfo from the incoming message
+ * 2. Applies counter mutation locally with generated mutation ID
+ * 3. Forwards result (Mutation not CounterMutation) to other replicas
with CoordinatorAckInfo
+ * 4. Sends leader's response back to coordinator
+ *
+ * @param counterMutation the counter mutation to apply
+ * @param message the original message (contains coordinator address and
message ID)
+ * @param respondToAddress the address to send the response to
(coordinator)
+ */
+ public static void applyForwardedCounterMutation(CounterMutation
counterMutation,
+ Message<CounterMutation>
message,
+ InetAddressAndPort
respondToAddress)
+ {
+ try
+ {
+ CoordinatorAckInfo coordinatorAckInfo =
CoordinatorAckInfo.toCoordinator(message.from(), message.id());
+
+ String keyspaceName = counterMutation.getKeyspaceName();
+ Token token = counterMutation.key().getToken();
+ Keyspace ks = Keyspace.open(keyspaceName);
+ ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks,
counterMutation.consistency(), token, ReplicaPlans.writeAll);
+ AbstractReplicationStrategy rs = plan.replicationStrategy();
+
+ MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+
+ if (logger.isTraceEnabled())
Review Comment:
You don't need `logger.isTraceEnabled()` check here. We use `isXEnabled()`
log checks in two scenarios:
1. The arguments to the method are expensive computations (or allocate), or
2. There are more than two parameters (excluding the format string) to pass.
This is because there exist overloads for format string + 0, 1, 2 arguments,
but anything beyond goes through a vararg method (e.g. `void trace(String var1,
Object... var2)` which has to allocate an array / potentially box some
primitives).
This should be somewhere in the style guide, but I can't actually find it
myself :\
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]