beobal commented on code in PR #3288:
URL: https://github.com/apache/cassandra/pull/3288#discussion_r1594282413


##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
      * calls to peers, retrieving a LogState which can be applied locally to 
produce the necessary {@code
      * ClusterMetadata}. The function takes a {@code Promise<LogState>} as 
input, with the expectation that this
      * specific instance will be used to provide blocking behaviour when 
making the rpc calls that fetch the {@code

Review Comment:
   should remove this too: `The function takes a {@code Promise<LogState>} as 
input, with the expectation that this specific instance will be used to provide 
blocking behaviour when making the rpc calls that fetch the {@code
   LogState}.`



##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
      * calls to peers, retrieving a LogState which can be applied locally to 
produce the necessary {@code
      * ClusterMetadata}. The function takes a {@code Promise<LogState>} as 
input, with the expectation that this
      * specific instance will be used to provide blocking behaviour when 
making the rpc calls that fetch the {@code
-     * LogState}. These promises are memoized in order to cancel them when 
{@link #shutdownAndWait(long, TimeUnit)} is
-     * called. This causes the fetch function to stop waiting on any in flight 
{@code LogState} requests and prevents
-     * shutdown from being blocked.
+     * LogState}.
      *
      * @param  fetchFunction executes the request for LogState. It's expected 
that this popluates fetchResult with the

Review Comment:
   And probably reword this too



##########
src/java/org/apache/cassandra/tcm/PeerLogFetcher.java:
##########
@@ -74,40 +74,54 @@ public ClusterMetadata 
fetchLogEntriesAndWait(InetAddressAndPort remote, Epoch a
 
     public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, 
Epoch awaitAtleast)
     {
-        Function<Promise<LogState>, ClusterMetadata> fn = promise -> 
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
-        return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+        return EpochAwareDebounce.instance.getAsync(() -> 
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
     }
 
-    private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> 
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+    private Future<ClusterMetadata> 
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
     {
         Epoch before = ClusterMetadata.current().epoch;
         if (before.isEqualOrAfter(awaitAtleast))
-            return ClusterMetadata.current();
+        {
+            Promise<ClusterMetadata> res = new AsyncPromise<>();
+            res.setSuccess(ClusterMetadata.current());
+            return res;
+        }
 
+        Promise<LogState> fetchRes = new AsyncPromise<>();
         logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
         try (Timer.Context ctx = 
TCMMetrics.instance.fetchPeerLogLatency.time())
         {
-            RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+            RemoteProcessor.sendWithCallbackAsync(fetchRes,
                                                   Verb.TCM_FETCH_PEER_LOG_REQ,
                                                   new FetchPeerLog(before),
                                                   new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
                                                   
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
                                                                        new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
-            LogState logState = remoteRequest.awaitUninterruptibly().get();
-            log.append(logState);
-            ClusterMetadata fetched = log.waitForHighestConsecutive();
-            if (fetched.epoch.isEqualOrAfter(awaitAtleast))
-            {
-                TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
-                return fetched;
-            }
+
+            return fetchRes.map((logState) -> {
+                log.append(logState);
+                ClusterMetadata fetched = log.waitForHighestConsecutive();
+                if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+                {
+                    TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
+                    return fetched;
+                }
+                else
+                {
+                    throw new IllegalStateException(String.format("Queried for 
epoch %s, but could not catch up", awaitAtleast));

Review Comment:
   do we want to throw here, or let the caller decide how to handle a failure 
to catch up? In some cases, this is purely a background task, so perhaps we 
should log but allow the caller to proceed? (I know we are certain that 
`remote` should be able to satisfy the request as we call this in response to 
seeing `awaitAtLeast` in some message from it).



##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
      * calls to peers, retrieving a LogState which can be applied locally to 
produce the necessary {@code
      * ClusterMetadata}. The function takes a {@code Promise<LogState>} as 
input, with the expectation that this
      * specific instance will be used to provide blocking behaviour when 
making the rpc calls that fetch the {@code
-     * LogState}. These promises are memoized in order to cancel them when 
{@link #shutdownAndWait(long, TimeUnit)} is
-     * called. This causes the fetch function to stop waiting on any in flight 
{@code LogState} requests and prevents
-     * shutdown from being blocked.
+     * LogState}.
      *
      * @param  fetchFunction executes the request for LogState. It's expected 
that this popluates fetchResult with the
      *                       successful result.
      * @param epoch the desired epoch
      * @return
      */
-    public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, 
ClusterMetadata> fetchFunction,
+    public Future<ClusterMetadata> getAsync(Supplier<Future<ClusterMetadata>> 
fetchFunction,
                                             Epoch epoch)
     {
         while (true)
         {
-            EpochAwareAsyncPromise running = currentFuture.get();
-            if (running != null && !running.isDone() && 
running.epoch.isEqualOrAfter(epoch))
-                return running;
+            EpochAwareFuture running = currentFuture.get();
+            // Someone else is about to install a new future
+            if (running == SENTINEL)
+                continue;
 
-            Promise<LogState> fetchResult = new AsyncPromise<>();
+            if (running != null && !running.future.isDone() && 
running.epoch.isEqualOrAfter(epoch))
+                return running.future;
 
-            EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
-            if (currentFuture.compareAndSet(running, promise))
+            if (currentFuture.compareAndSet(running, SENTINEL))
             {
-                fetchResult.addCallback((logState, error) -> {
-                    logger.debug("Removing future remotely requesting epoch {} 
from in flight list", epoch);
-                    inflightRequests.remove(fetchResult);
-                });
-                inflightRequests.add(fetchResult);
-
-                executor.submit(() -> {
-                    try
-                    {
-                        promise.setSuccess(fetchFunction.apply(fetchResult));
-                    }
-                    catch (Throwable t)
-                    {
-                        fetchResult.cancel(true);
-                        inflightRequests.remove(fetchResult);
-                        promise.setFailure(t);
-                    }
-                });
-                return promise;
+                EpochAwareFuture promise = new EpochAwareFuture(epoch, 
fetchFunction.get());;

Review Comment:
   nit: `;;`



##########
src/java/org/apache/cassandra/tcm/RemoteProcessor.java:
##########
@@ -152,34 +150,37 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, 
Retry.Deadline retryPolicy
 
     public static ClusterMetadata fetchLogAndWait(CandidateIterator 
candidateIterator, LocalLog log)
     {
-        Promise<LogState> remoteRequest = new AsyncPromise<>();
-        return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
+        try
+        {
+            return fetchLogAndWaitInternal(candidateIterator, 
log).awaitUninterruptibly().get();

Review Comment:
   Should we `awaitThrowUncheckedOnInterrupt`, or even just `await` here? I'm 
thinking of tests really where the origin implementation (before 
CASSANDRA-19514) could cause occasional timeouts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to