aweisberg commented on code in PR #3395:
URL: https://github.com/apache/cassandra/pull/3395#discussion_r1707970068


##########
src/java/org/apache/cassandra/hints/HintsDispatcher.java:
##########
@@ -194,38 +307,106 @@ private <T> Action sendHints(Iterator<T> hints, 
Collection<Callback> callbacks,
         return Action.CONTINUE;
     }
 
+    // TODO (review): Could add the loop, but how often will it be needed?
+    // While this could loop if splitting the mutation misroutes there isn't a 
strong need because Hints will retry anyways
+    // It will cause hint delivery for this endpoint to pause until it is 
rescheduled again
+    // Given the structure of page at a time delivery it's also a little 
tricky because we would really need to iterate over the entire page
+    // again only applying the hints that ended up needing to be re-routed
     private Callback sendHint(Hint hint)
     {
-        Callback callback = new Callback(hint.creationTime);
-        Message<?> message = Message.out(HINT_REQ, new HintMessage(hostId, 
hint));
-        MessagingService.instance().sendWithCallback(message, address, 
callback);
+        Pair<Mutation, Hint> accordAndNormal = 
splitHintIntoAccordAndNormal(hint);
+        Mutation accordHintMutation = accordAndNormal.left;
+        long txnStartNanoTime = 0;
+        Pair<TxnId, Future<TxnResult>> accordTxnResult = null;
+        if (accordHintMutation != null)
+        {
+            txnStartNanoTime = Clock.Global.nanoTime();
+            accordTxnResult = accordHintMutation != null ? 
mutateWithAccordAsync(accordHintMutation, null, txnStartNanoTime) : null;
+        }
+
+        Hint normalHint = accordAndNormal.right;
+        Callback callback = new Callback(address, hint.creationTime, 
txnStartNanoTime, accordTxnResult);
+        if (normalHint != null)
+        {
+            // We had a hint that was supposed to be done on Accord for the 
batch log (otherwise address would be non-null),
+            // but Accord no longer manages that table/range and now we don't 
know which nodes (if any) are missing the Mutation.
+            // Convert them to per replica hints *after* all the hints in this 
page have been applied so we can be reasonably sure
+            // this page isn't going to be played again thus avoiding any 
futher amplification from the same hint being
+            // replayed and repeatedly converted to per replica hints
+            if (address == null)
+            {
+                checkState(hostId.equals(RETRY_ON_DIFFERENT_SYSTEM_UUID), "If 
there is no address to send the hint to then the host ID should be 
BATCHLOG_ACCORD_HINT_UUID");
+                callback.onResponse(null);
+                hintsNeedingRehinting.add(normalHint);
+            }
+            else
+            {
+                Message<?> message = Message.out(HINT_REQ, new 
HintMessage(hostId, normalHint));
+                MessagingService.instance().sendWithCallback(message, address, 
callback);
+            }
+        }
+        else
+        {
+            // Don't wait for a normal response that will never come since no 
hints were sent
+            callback.onResponse(null);
+        }
+
         return callback;
     }
 
+    private Pair<Mutation, Hint> splitHintIntoAccordAndNormal(Hint hint)
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        Pair<Mutation, Mutation> accordAndNormal = 
splitMutationIntoAccordAndNormal(hint.mutation, cm);
+        if (accordAndNormal.left == null)
+            return Pair.create(null, hint);
+        if (accordAndNormal.right == null)
+            return Pair.create(accordAndNormal.left, null);
+        Hint normalHint = Hint.create(accordAndNormal.right, 
hint.creationTime, accordAndNormal.right.smallestGCGS());
+        return Pair.create(accordAndNormal.left, normalHint);
+    }
+
     /*
      * Sending hints in raw mode.
      */
 
     private Callback sendEncodedHint(ByteBuffer hint)
     {
         HintMessage.Encoded message = new HintMessage.Encoded(hostId, hint, 
messagingVersion);
-        Callback callback = new Callback(message.getHintCreationTime());
+        Callback callback = new Callback(address, 
message.getHintCreationTime());
         MessagingService.instance().sendWithCallback(Message.out(HINT_REQ, 
message), address, callback);
         return callback;
     }
 
-    static final class Callback implements RequestCallback
+    static final class Callback implements RequestCallback, Runnable
     {
         enum Outcome { SUCCESS, TIMEOUT, FAILURE, INTERRUPTED }
 
         private final long start = approxTime.now();
         private final Condition condition = newOneTimeCondition();
-        private volatile Outcome outcome;
+        private Outcome normalOutcome;
+        private Outcome accordOutcome;
+        @Nullable
+        private final InetAddressAndPort to;
         private final long hintCreationNanoTime;
+        private final long accordTxnStartNanos;
+        private final Pair<TxnId, Future<TxnResult>> accordTxnResult;
+
+        private Callback(@Nonnull InetAddressAndPort to, long 
hintCreationTimeMillisSinceEpoch)
+        {
+            this(to, hintCreationTimeMillisSinceEpoch, -1, null);
+        }
 
-        private Callback(long hintCreationTimeMillisSinceEpoch)
+        private Callback(@Nullable InetAddressAndPort to, long 
hintCreationTimeMillisSinceEpoch, long accordTxnStartNanos, @Nullable 
Pair<TxnId, Future<TxnResult>> accordTxnResult)
         {
+            this.to = to != null ? to : ACCORD_HINT_ENDPOINT;
             this.hintCreationNanoTime = 
approxTime.translate().fromMillisSinceEpoch(hintCreationTimeMillisSinceEpoch);
+            this.accordTxnStartNanos = accordTxnStartNanos;
+            this.accordTxnResult = accordTxnResult;
+            if (accordTxnResult != null)
+                accordTxnResult.right.addListener(this, 
ImmediateExecutor.INSTANCE);

Review Comment:
   We get the error either way because in `run` it calls `getTxnResult` which 
does the error handling. Not sure it's worth refactoring this in particular 
because it makes implementing `getTxnResult` a little more complicated since it 
would have to work as both a callback and as something invoked on a promise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to