ifesdjeen commented on code in PR #3288:
URL: https://github.com/apache/cassandra/pull/3288#discussion_r1598049800
##########
src/java/org/apache/cassandra/tcm/PeerLogFetcher.java:
##########
@@ -74,40 +74,54 @@ public ClusterMetadata
fetchLogEntriesAndWait(InetAddressAndPort remote, Epoch a
public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote,
Epoch awaitAtleast)
{
- Function<Promise<LogState>, ClusterMetadata> fn = promise ->
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
- return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+ return EpochAwareDebounce.instance.getAsync(() ->
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
}
- private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState>
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+ private Future<ClusterMetadata>
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
- return ClusterMetadata.current();
+ {
+ Promise<ClusterMetadata> res = new AsyncPromise<>();
+ res.setSuccess(ClusterMetadata.current());
+ return res;
+ }
+ Promise<LogState> fetchRes = new AsyncPromise<>();
logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
try (Timer.Context ctx =
TCMMetrics.instance.fetchPeerLogLatency.time())
{
- RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+ RemoteProcessor.sendWithCallbackAsync(fetchRes,
Verb.TCM_FETCH_PEER_LOG_REQ,
new FetchPeerLog(before),
new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
new
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
- LogState logState = remoteRequest.awaitUninterruptibly().get();
- log.append(logState);
- ClusterMetadata fetched = log.waitForHighestConsecutive();
- if (fetched.epoch.isEqualOrAfter(awaitAtleast))
- {
- TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
- return fetched;
- }
+
+ return fetchRes.map((logState) -> {
+ log.append(logState);
+ ClusterMetadata fetched = log.waitForHighestConsecutive();
+ if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+ {
+ TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
+ return fetched;
+ }
+ else
+ {
+ throw new IllegalStateException(String.format("Queried for
epoch %s, but could not catch up", awaitAtleast));
Review Comment:
Right now, we will log this (rare) exception, which ji think is a reasonable
behaviour.
On the other hand, if we have a listener, it will have to deal with
exception one way or the other: this seems ok, too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]