beobal commented on code in PR #3288:
URL: https://github.com/apache/cassandra/pull/3288#discussion_r1598073075


##########
src/java/org/apache/cassandra/tcm/PeerLogFetcher.java:
##########
@@ -74,40 +74,54 @@ public ClusterMetadata 
fetchLogEntriesAndWait(InetAddressAndPort remote, Epoch a
 
     public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, 
Epoch awaitAtleast)
     {
-        Function<Promise<LogState>, ClusterMetadata> fn = promise -> 
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
-        return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+        return EpochAwareDebounce.instance.getAsync(() -> 
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
     }
 
-    private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> 
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+    private Future<ClusterMetadata> 
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
     {
         Epoch before = ClusterMetadata.current().epoch;
         if (before.isEqualOrAfter(awaitAtleast))
-            return ClusterMetadata.current();
+        {
+            Promise<ClusterMetadata> res = new AsyncPromise<>();
+            res.setSuccess(ClusterMetadata.current());
+            return res;
+        }
 
+        Promise<LogState> fetchRes = new AsyncPromise<>();
         logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
         try (Timer.Context ctx = 
TCMMetrics.instance.fetchPeerLogLatency.time())
         {
-            RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+            RemoteProcessor.sendWithCallbackAsync(fetchRes,
                                                   Verb.TCM_FETCH_PEER_LOG_REQ,
                                                   new FetchPeerLog(before),
                                                   new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
                                                   
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
                                                                        new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
-            LogState logState = remoteRequest.awaitUninterruptibly().get();
-            log.append(logState);
-            ClusterMetadata fetched = log.waitForHighestConsecutive();
-            if (fetched.epoch.isEqualOrAfter(awaitAtleast))
-            {
-                TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
-                return fetched;
-            }
+
+            return fetchRes.map((logState) -> {
+                log.append(logState);
+                ClusterMetadata fetched = log.waitForHighestConsecutive();
+                if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+                {
+                    TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
+                    return fetched;
+                }
+                else
+                {
+                    throw new IllegalStateException(String.format("Queried for 
epoch %s, but could not catch up", awaitAtleast));

Review Comment:
   ok, seems reasonable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to