aweisberg commented on code in PR #3395:
URL: https://github.com/apache/cassandra/pull/3395#discussion_r1706057677


##########
src/java/org/apache/cassandra/batchlog/BatchlogManager.java:
##########
@@ -340,66 +391,116 @@ private static class ReplayingBatch
     {
         private final TimeUUID id;
         private final long writtenAt;
-        private final List<Mutation> mutations;
+        private final int unsplitGcGs;
+        private final List<Mutation> normalMutations;
+        private final List<Mutation> accordMutations;
         private final int replayedBytes;
 
-        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers = 
ImmutableList.of();
+        private Pair<TxnId, Future<TxnResult>> accordResult;
+        private long accordTxnStartNanos;
 
         ReplayingBatch(TimeUUID id, int version, List<ByteBuffer> 
serializedMutations) throws IOException
         {
             this.id = id;
             this.writtenAt = id.unix(MILLISECONDS);
-            this.mutations = new ArrayList<>(serializedMutations.size());
-            this.replayedBytes = addMutations(version, serializedMutations);
+            List<Mutation> unsplitMutations = new 
ArrayList<>(serializedMutations.size());
+            this.replayedBytes = addMutations(unsplitMutations, version, 
serializedMutations);
+            unsplitGcGs = gcgs(unsplitMutations);
+            Pair<List<Mutation>, List<Mutation>> accordAndNormal = 
ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal(unsplitMutations);
+            logger.trace("Replaying batch with Accord {} and normal {}", 
accordAndNormal.left, accordAndNormal.right);
+            normalMutations = accordAndNormal.right;
+            accordMutations = accordAndNormal.left;
         }
 
-        public int replay(RateLimiter rateLimiter, Set<UUID> hintedNodes) 
throws IOException
+        public boolean replay(RateLimiter rateLimiter, Set<UUID> hintedNodes) 
throws IOException
         {
             logger.trace("Replaying batch {}", id);
 
-            if (mutations.isEmpty())
-                return 0;
+            if ((normalMutations == null || normalMutations.isEmpty()) && 
(accordMutations == null || accordMutations.isEmpty()))
+                return false;
 
-            int gcgs = gcgs(mutations);
-            if (MILLISECONDS.toSeconds(writtenAt) + gcgs <= 
FBUtilities.nowInSeconds())
-                return 0;
+            if (MILLISECONDS.toSeconds(writtenAt) + unsplitGcGs <= 
FBUtilities.nowInSeconds())
+                return false;
+
+            // TODO (expected): this can refuse to initiate if Accord loses 
the ranges that it had when splitting making the batch fail to apply
+            if (accordMutations != null)
+            {
+                accordTxnStartNanos = Clock.Global.nanoTime();
+                accordResult = accordMutations != null ? 
mutateWithAccordAsync(accordMutations, null, accordTxnStartNanos) : null;
+            }
 
-            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+            if (normalMutations != null)
+                replayHandlers = sendReplays(normalMutations, writtenAt, 
hintedNodes);
 
             rateLimiter.acquire(replayedBytes); // acquire afterwards, to not 
mess up ttl calculation.
 
-            return replayHandlers.size();
+            return replayHandlers.size() > 0 || accordMutations != null;
         }
 
         public void finish(Set<UUID> hintedNodes)
         {
-            for (int i = 0; i < replayHandlers.size(); i++)
+            Throwable failure = null;
+            // Check if the Accord mutations succeeded asynchronously
+            try
             {
-                ReplayWriteResponseHandler<Mutation> handler = 
replayHandlers.get(i);
-                try
+                if (accordResult != null)
                 {
-                    handler.get();
+                    IAccordService accord = AccordService.instance();
+                    TxnResult.Kind kind = accord.getTxnResult(accordResult, 
true, ConsistencyLevel.QUORUM, accordTxnStartNanos).kind();
+                    if (kind == retry_new_protocol)
+                        throw new RetryOnDifferentSystemException();
                 }
-                catch (WriteTimeoutException|WriteFailureException e)
+            }
+            catch 
(WriteTimeoutException|WriteFailureException|RetryOnDifferentSystemException  e)
+            {
+                logger.trace("Failed replaying a batched mutation on Accord, 
will write a hint");
+                logger.trace("Failure was : {}", e.getMessage());
+                writeHintsForUndeliveredAccordTxns(hintedNodes);
+            }
+            catch (Exception e)
+            {
+                failure = Throwables.merge(failure, e);
+            }
+
+            try
+            {
+                for (int i = 0; i < replayHandlers.size(); i++)
                 {
-                    logger.trace("Failed replaying a batched mutation to a 
node, will write a hint");
-                    logger.trace("Failure was : {}", e.getMessage());
-                    // writing hints for the rest to hints, starting from i
-                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
-                    return;
+                    ReplayWriteResponseHandler<Mutation> handler = 
replayHandlers.get(i);
+                    try
+                    {
+                        handler.get();
+                    }
+                    catch 
(WriteTimeoutException|WriteFailureException|RetryOnDifferentSystemException e)
+                    {
+                        logger.trace("Failed replaying a batched mutation to a 
node, will write a hint");
+                        logger.trace("Failure was : {}", e.getMessage());
+                        // writing hints for the rest to hints, starting from i
+                        writeHintsForUndeliveredEndpoints(i, hintedNodes);
+                        break;
+                    }
                 }
             }
+            catch (Exception e)
+            {
+                logger.debug("Unexpected batchlog replay exception", e);
+                failure = Throwables.merge(failure, e);
+            }
+
+            if (failure != null)
+                throw Throwables.unchecked(failure);
         }
 
-        private int addMutations(int version, List<ByteBuffer> 
serializedMutations) throws IOException
+        private int addMutations(List<Mutation> unsplitMutations, int version, 
List<ByteBuffer> serializedMutations) throws IOException

Review Comment:
   Will make them static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to