beobal commented on code in PR #3288:
URL: https://github.com/apache/cassandra/pull/3288#discussion_r1594282413
##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
* calls to peers, retrieving a LogState which can be applied locally to
produce the necessary {@code
* ClusterMetadata}. The function takes a {@code Promise<LogState>} as
input, with the expectation that this
* specific instance will be used to provide blocking behaviour when
making the rpc calls that fetch the {@code
Review Comment:
should remove this too: `The function takes a {@code Promise<LogState>} as
input, with the expectation that this specific instance will be used to provide
blocking behaviour when making the rpc calls that fetch the {@code
LogState}.`
##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
* calls to peers, retrieving a LogState which can be applied locally to
produce the necessary {@code
* ClusterMetadata}. The function takes a {@code Promise<LogState>} as
input, with the expectation that this
* specific instance will be used to provide blocking behaviour when
making the rpc calls that fetch the {@code
- * LogState}. These promises are memoized in order to cancel them when
{@link #shutdownAndWait(long, TimeUnit)} is
- * called. This causes the fetch function to stop waiting on any in flight
{@code LogState} requests and prevents
- * shutdown from being blocked.
+ * LogState}.
*
* @param fetchFunction executes the request for LogState. It's expected
that this popluates fetchResult with the
Review Comment:
And probably reword this too
##########
src/java/org/apache/cassandra/tcm/PeerLogFetcher.java:
##########
@@ -74,40 +74,54 @@ public ClusterMetadata
fetchLogEntriesAndWait(InetAddressAndPort remote, Epoch a
public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote,
Epoch awaitAtleast)
{
- Function<Promise<LogState>, ClusterMetadata> fn = promise ->
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
- return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+ return EpochAwareDebounce.instance.getAsync(() ->
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
}
- private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState>
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+ private Future<ClusterMetadata>
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
- return ClusterMetadata.current();
+ {
+ Promise<ClusterMetadata> res = new AsyncPromise<>();
+ res.setSuccess(ClusterMetadata.current());
+ return res;
+ }
+ Promise<LogState> fetchRes = new AsyncPromise<>();
logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
try (Timer.Context ctx =
TCMMetrics.instance.fetchPeerLogLatency.time())
{
- RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+ RemoteProcessor.sendWithCallbackAsync(fetchRes,
Verb.TCM_FETCH_PEER_LOG_REQ,
new FetchPeerLog(before),
new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
new
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
- LogState logState = remoteRequest.awaitUninterruptibly().get();
- log.append(logState);
- ClusterMetadata fetched = log.waitForHighestConsecutive();
- if (fetched.epoch.isEqualOrAfter(awaitAtleast))
- {
- TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
- return fetched;
- }
+
+ return fetchRes.map((logState) -> {
+ log.append(logState);
+ ClusterMetadata fetched = log.waitForHighestConsecutive();
+ if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+ {
+ TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
+ return fetched;
+ }
+ else
+ {
+ throw new IllegalStateException(String.format("Queried for
epoch %s, but could not catch up", awaitAtleast));
Review Comment:
do we want to throw here, or let the caller decide how to handle a failure
to catch up? In some cases, this is purely a background task, so perhaps we
should log but allow the caller to proceed? (I know we are certain that
`remote` should be able to satisfy the request as we call this in response to
seeing `awaitAtLeast` in some message from it).
##########
src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java:
##########
@@ -62,66 +45,46 @@ private EpochAwareDebounce()
* calls to peers, retrieving a LogState which can be applied locally to
produce the necessary {@code
* ClusterMetadata}. The function takes a {@code Promise<LogState>} as
input, with the expectation that this
* specific instance will be used to provide blocking behaviour when
making the rpc calls that fetch the {@code
- * LogState}. These promises are memoized in order to cancel them when
{@link #shutdownAndWait(long, TimeUnit)} is
- * called. This causes the fetch function to stop waiting on any in flight
{@code LogState} requests and prevents
- * shutdown from being blocked.
+ * LogState}.
*
* @param fetchFunction executes the request for LogState. It's expected
that this popluates fetchResult with the
* successful result.
* @param epoch the desired epoch
* @return
*/
- public Future<ClusterMetadata> getAsync(Function<Promise<LogState>,
ClusterMetadata> fetchFunction,
+ public Future<ClusterMetadata> getAsync(Supplier<Future<ClusterMetadata>>
fetchFunction,
Epoch epoch)
{
while (true)
{
- EpochAwareAsyncPromise running = currentFuture.get();
- if (running != null && !running.isDone() &&
running.epoch.isEqualOrAfter(epoch))
- return running;
+ EpochAwareFuture running = currentFuture.get();
+ // Someone else is about to install a new future
+ if (running == SENTINEL)
+ continue;
- Promise<LogState> fetchResult = new AsyncPromise<>();
+ if (running != null && !running.future.isDone() &&
running.epoch.isEqualOrAfter(epoch))
+ return running.future;
- EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
- if (currentFuture.compareAndSet(running, promise))
+ if (currentFuture.compareAndSet(running, SENTINEL))
{
- fetchResult.addCallback((logState, error) -> {
- logger.debug("Removing future remotely requesting epoch {}
from in flight list", epoch);
- inflightRequests.remove(fetchResult);
- });
- inflightRequests.add(fetchResult);
-
- executor.submit(() -> {
- try
- {
- promise.setSuccess(fetchFunction.apply(fetchResult));
- }
- catch (Throwable t)
- {
- fetchResult.cancel(true);
- inflightRequests.remove(fetchResult);
- promise.setFailure(t);
- }
- });
- return promise;
+ EpochAwareFuture promise = new EpochAwareFuture(epoch,
fetchFunction.get());;
Review Comment:
nit: `;;`
##########
src/java/org/apache/cassandra/tcm/RemoteProcessor.java:
##########
@@ -152,34 +150,37 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor,
Retry.Deadline retryPolicy
public static ClusterMetadata fetchLogAndWait(CandidateIterator
candidateIterator, LocalLog log)
{
- Promise<LogState> remoteRequest = new AsyncPromise<>();
- return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
+ try
+ {
+ return fetchLogAndWaitInternal(candidateIterator,
log).awaitUninterruptibly().get();
Review Comment:
Should we `awaitThrowUncheckedOnInterrupt`, or even just `await` here? I'm
thinking of tests really where the origin implementation (before
CASSANDRA-19514) could cause occasional timeouts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]