dcapwell commented on code in PR #2844:
URL: https://github.com/apache/cassandra/pull/2844#discussion_r1372135468


##########
src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java:
##########
@@ -154,34 +160,37 @@ synchronized boolean hasFailed()
         return getState() == State.FAILED || 
Iterables.any(participantStates.values(), v -> v == State.FAILED);
     }
 
-    protected void sendMessage(InetAddressAndPort destination, 
Message<RepairMessage> message)
-    {
-        logger.trace("Sending {} to {}", message.payload, destination);
-
-        ctx.messaging().send(message, destination);
-    }
-
     public Future<Void> prepare()
     {
         Preconditions.checkArgument(allStates(State.PREPARING));
 
         logger.info("Beginning prepare phase of incremental repair session 
{}", sessionID);
-        Message<RepairMessage> message =
-            Message.out(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, coordinator, participants));
+
+        PrepareConsistentRequest request = new 
PrepareConsistentRequest(sessionID, coordinator, participants);
         for (final InetAddressAndPort participant : participants)
         {
-            sendMessage(participant, message);
+            sendMessageWithRetries(ctx,
+                                                 notDone(prepareFuture),
+                                                 request,
+                                                 Verb.PREPARE_CONSISTENT_REQ,
+                                                 participant);
         }
         return prepareFuture;
     }
 
-    public synchronized void handlePrepareResponse(InetAddressAndPort 
participant, boolean success)
+    public synchronized void 
handlePrepareResponse(Message<PrepareConsistentResponse> msg)
     {
+        InetAddressAndPort participant = msg.payload.participant;
+        boolean success = msg.payload.success;
         if (getState() == State.FAILED)
         {
             logger.trace("Incremental repair {} has failed, ignoring prepare 
response from {}", sessionID, participant);
+            sendFailureResponse(ctx, msg);
             return;
         }
+        sendAck(ctx, msg);
+        if (getParticipantState(participant) != State.PREPARING)

Review Comment:
   this also fixes a IR bug where we see the response *after* we get a fail 
session request...  in that case we fail and log a stack trace saying we can't 
go from `FAILURE -> PREPARED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to