aweisberg commented on code in PR #3395:
URL: https://github.com/apache/cassandra/pull/3395#discussion_r1709795423


##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1344,66 +1266,151 @@ public static void 
mutateAtomically(Collection<Mutation> mutations,
             // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM 
nodes see the update.
             ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove
                                                      ? ConsistencyLevel.QUORUM
-                                                     : consistency_level;
+                                                     : consistencyLevel;
 
-            switch (consistency_level)
+            switch (consistencyLevel)
             {
                 case ALL:
                 case EACH_QUORUM:
-                    batchConsistencyLevel = consistency_level;
+                    batchConsistencyLevel = consistencyLevel;
             }
 
             ReplicaPlan.ForWrite replicaPlan = 
ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY);
 
             final TimeUUID batchUUID = nextTimeUUID();
-            BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
-                                                          () -> 
asyncRemoveFromBatchlog(replicaPlan, batchUUID));
-
-            // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
-            for (Mutation mutation : mutations)
+            boolean wroteToBatchLog = false;
+            while (true)
             {
-                WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(mutation,
-                                                                               
consistency_level,
-                                                                               
batchConsistencyLevel,
-                                                                               
WriteType.BATCH,
-                                                                               
cleanup,
-                                                                               
queryStartNanoTime);
-                // exit early if we can't fulfill the CL at this time.
-                wrappers.add(wrapper);
-            }
+                try
+                {
+                    BatchlogCleanup cleanup = new 
BatchlogCleanup(mutations.size(),
+                                                                  () -> 
asyncRemoveFromBatchlog(replicaPlan, batchUUID));
+                    List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<>(mutations.size());
 
-            // write to the batchlog
-            syncWriteToBatchlog(mutations, replicaPlan, batchUUID, 
queryStartNanoTime);
+                    // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
+                    for (Mutation mutation : mutations)
+                    {
+                        WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(mutation,
+                                                                               
        consistencyLevel,
+                                                                               
        batchConsistencyLevel,
+                                                                               
        WriteType.BATCH,
+                                                                               
        cleanup,
+                                                                               
        queryStartNanoTime);
+                        // exit early if we can't fulfill the CL at this time.
+                        wrappers.add(wrapper);
+                    }
+
+                    // TODO (review): A lot of the code duplication in 
splitting is from WriteResponseHandlerWrapper, but we could split before making 
the wrappers

Review Comment:
   I was able to refactor this pretty cleanly to use a consumer for the split 
mutations to either aggregate into pair of mutation lists or in the batch case 
into a list and some the wrappers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to