aweisberg commented on code in PR #3395:
URL: https://github.com/apache/cassandra/pull/3395#discussion_r1709795423
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1344,66 +1266,151 @@ public static void
mutateAtomically(Collection<Mutation> mutations,
// require ALL, or EACH_QUORUM. This is so that *at least* QUORUM
nodes see the update.
ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove
? ConsistencyLevel.QUORUM
- : consistency_level;
+ : consistencyLevel;
- switch (consistency_level)
+ switch (consistencyLevel)
{
case ALL:
case EACH_QUORUM:
- batchConsistencyLevel = consistency_level;
+ batchConsistencyLevel = consistencyLevel;
}
ReplicaPlan.ForWrite replicaPlan =
ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY);
final TimeUUID batchUUID = nextTimeUUID();
- BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
- () ->
asyncRemoveFromBatchlog(replicaPlan, batchUUID));
-
- // add a handler for each mutation - includes checking
availability, but doesn't initiate any writes, yet
- for (Mutation mutation : mutations)
+ boolean wroteToBatchLog = false;
+ while (true)
{
- WriteResponseHandlerWrapper wrapper =
wrapBatchResponseHandler(mutation,
-
consistency_level,
-
batchConsistencyLevel,
-
WriteType.BATCH,
-
cleanup,
-
queryStartNanoTime);
- // exit early if we can't fulfill the CL at this time.
- wrappers.add(wrapper);
- }
+ try
+ {
+ BatchlogCleanup cleanup = new
BatchlogCleanup(mutations.size(),
+ () ->
asyncRemoveFromBatchlog(replicaPlan, batchUUID));
+ List<WriteResponseHandlerWrapper> wrappers = new
ArrayList<>(mutations.size());
- // write to the batchlog
- syncWriteToBatchlog(mutations, replicaPlan, batchUUID,
queryStartNanoTime);
+ // add a handler for each mutation - includes checking
availability, but doesn't initiate any writes, yet
+ for (Mutation mutation : mutations)
+ {
+ WriteResponseHandlerWrapper wrapper =
wrapBatchResponseHandler(mutation,
+
consistencyLevel,
+
batchConsistencyLevel,
+
WriteType.BATCH,
+
cleanup,
+
queryStartNanoTime);
+ // exit early if we can't fulfill the CL at this time.
+ wrappers.add(wrapper);
+ }
+
+ // TODO (review): A lot of the code duplication in
splitting is from WriteResponseHandlerWrapper, but we could split before making
the wrappers
Review Comment:
I was able to refactor this pretty cleanly to use a consumer for the split
mutations to either aggregate into pair of mutation lists or in the batch case
into a list and some the wrappers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]