ifesdjeen commented on code in PR #3943: URL: https://github.com/apache/cassandra/pull/3943#discussion_r1977958218
########## test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java: ########## @@ -122,6 +119,7 @@ protected void preCheck(Property.StatefulBuilder builder) // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // HISTORY_SHOWS_CQL = true; // uncomment if the CQL done should be included in the history + builder.withSeed(3448695820622701960L); Review Comment: just to make sure: hardcoded seed deliberate here? ########## src/java/org/apache/cassandra/service/RetryStrategy.java: ########## @@ -180,108 +198,111 @@ public long wait(long min, long max, int attempts) } public final WaitRandomizer waitRandomizer; - public final Wait min, max, spread; - - public RetryStrategy(String waitRandomizer, String min, String max, String spread, LatencySourceFactory latencies) - { - this.waitRandomizer = parseWaitRandomizer(waitRandomizer); - this.min = parseBound(min, true, latencies); - this.max = parseBound(max, false, latencies); - this.spread = parseBound(spread, true, latencies); - } + public final long minMinMicros, maxMaxMicros; + public final @Nullable Wait min; + public final @Nonnull Wait max; + public final int maxAttempts; - protected RetryStrategy(WaitRandomizer waitRandomizer, Wait min, Wait max, Wait spread) + protected RetryStrategy(WaitRandomizer waitRandomizer, long minMinMicros, Wait min, Wait max, long maxMaxMicros, int retries) { this.waitRandomizer = waitRandomizer; + this.minMinMicros = minMinMicros; this.min = min; this.max = max; - this.spread = spread; - } - - protected Wait parseBound(String spec, boolean isMin, LatencySourceFactory latencies) - { - long defaultMaxMicros = DatabaseDescriptor.getRpcTimeout(MICROSECONDS); - return parseWait(spec, 0, defaultMaxMicros, isMin ? 0 : defaultMaxMicros, latencies); + this.maxMaxMicros = maxMaxMicros; + this.maxAttempts = retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1; + Invariants.require(maxAttempts >= 1); } - protected long computeWaitUntil(int attempts) + public long computeWaitUntil(int attempts) { - long wait = computeWait(attempts); - return nanoTime() + MICROSECONDS.toNanos(wait); + long wait = computeWait(attempts, NANOSECONDS); + if (wait < 0) + return -1; + return nanoTime() + wait; } - protected long computeWait(int attempts) + public long computeWait(int attempt, TimeUnit units) { - long minWaitMicros = min.get(attempts); - long maxWaitMicros = max.get(attempts); - long spreadMicros = spread.get(attempts); + if (attempt > maxAttempts) + return -1; - if (minWaitMicros + spreadMicros > maxWaitMicros) + long result; + if (min == null) { - maxWaitMicros = minWaitMicros + spreadMicros; - if (maxWaitMicros > this.max.max) - { - maxWaitMicros = this.max.max; - minWaitMicros = max(this.min.min, min(this.min.max, maxWaitMicros - spreadMicros)); - } + result = max.getMicros(attempt); } - - return waitRandomizer.wait(minWaitMicros, maxWaitMicros, attempts); - } - - public static class ParsedStrategy - { - public final String waitRandomizer, min, max, spread; - public final RetryStrategy strategy; - - protected ParsedStrategy(String waitRandomizer, String min, String max, String spread, RetryStrategy strategy) + else { - this.waitRandomizer = waitRandomizer; - this.min = min; - this.max = max; - this.spread = spread; - this.strategy = strategy; + long min = this.min.getMicros(attempt); + long max = this.max.getMicros(attempt); + result = min >= max ? min : waitRandomizer.wait(min, max, attempt); } - public String toString() - { - return "min=" + min + ",max=" + max + ",spread=" + spread + ",random=" + waitRandomizer; - } + if (result > maxMaxMicros) result = maxMaxMicros; + if (result < minMinMicros) result = minMinMicros; + return units.convert(result, MICROSECONDS); } - @VisibleForTesting - public static ParsedStrategy parseStrategy(String spec, LatencySourceFactory latencies, ParsedStrategy defaultStrategy) + public static RetryStrategy parse(String spec, LatencySourceFactory latencies) { - String[] args = spec.split(","); - String waitRandomizer = find(args, "random"); - String min = find(args, "min"); - String max = find(args, "max"); - String spread = find(args, "spread"); - if (spread == null) - spread = find(args, "delta"); - - if (waitRandomizer == null) waitRandomizer = defaultStrategy.waitRandomizer; - if (min == null) min = defaultStrategy.min; - if (max == null) max = defaultStrategy.max; - if (spread == null) spread = defaultStrategy.spread; - - RetryStrategy strategy = new RetryStrategy(waitRandomizer, min, max, spread, latencies); - return new ParsedStrategy(waitRandomizer, min, max, spread, strategy); + return parse(spec, latencies, null); } - protected static String find(String[] args, String param) + public static RetryStrategy parse(String spec, LatencySourceFactory latencies, WaitRandomizer randomizer) { - return stream(args).filter(s -> s.startsWith(param + '=')) - .map(s -> s.substring(param.length() + 1)) - .findFirst().orElse(null); - } + String original = spec; + int retries = Integer.MAX_VALUE; + int end = spec.length(); + { + int next; + while ((next = spec.lastIndexOf(',', end - 1)) >= 0) + { + int mid = spec.indexOf('=', next + 1); + if (mid <= next || mid >= end) + throw new IllegalArgumentException("Invalid modifier specification: '" + spec.substring(next, end) + "'; expecting '=' for value assignment"); + String key = spec.substring(next + 1, mid).trim(); + String value = spec.substring(mid + 1, end).trim(); + switch (key) + { + default: throw new IllegalArgumentException("Invalid modifier specification: unrecognised property '" + key + '\''); + case "retries": + retries = Integer.parseInt(value); + break; + case "rnd": + if (randomizer != null) + throw new IllegalArgumentException("Randomizer already specified, cannot re-specify: " + value); + randomizer = parseWaitRandomizer(value); + break; + } + end = next; + } + if (end != spec.length()) + spec = spec.substring(0, end); + } - static WaitRandomizer parseWaitRandomizer(String input) - { - return parseWaitRandomizer(input, randomizers); + Matcher m = PARSE.matcher(spec); + if (!m.matches()) + throw new IllegalArgumentException("Invalid specification: '" + spec + "'; does not match " + PARSE); Review Comment: A somewhat simplistic but maybe potentially useful fuzz test: ``` package org.apache.cassandra.service; import java.util.function.IntFunction; import org.junit.jupiter.api.Test; import accord.utils.Gen; import accord.utils.RandomTestRunner; public class RetryStrategyTest { @Test public void fuzzParser() { Gen<IntFunction<String>> expressionGen = random -> i -> { switch (i) { case 0: return String.format("p%d * %d", random.nextInt(100), random.nextInt(0, Integer.MAX_VALUE)); case 1: return String.format("p%d * %d * attempts", random.nextInt(100), random.nextInt(0, Integer.MAX_VALUE)); case 2: return String.format("p%d * %d ^ attempts", random.nextInt(100), random.nextInt(0, Integer.MAX_VALUE)); case 3: return String.format("%dms", random.nextInt(0, Integer.MAX_VALUE)); default: throw new IllegalArgumentException(Integer.toString(i)); } }; RandomTestRunner.test().check(rs -> { IntFunction<String> expression = expressionGen.next(rs); for (int i = 0; i < 10_000; i++) { RetryStrategy.parse(String.format("%dms <= %s", rs.nextInt(0, Integer.MAX_VALUE), expression.apply(rs.nextInt(4))), new TestLatencySourceFactory()); RetryStrategy.parse(String.format("%dms <= %s <= %dms", rs.nextInt(0, Integer.MAX_VALUE), expression.apply(rs.nextInt(3)), rs.nextInt(0, Integer.MAX_VALUE)), new TestLatencySourceFactory()); } }); } private static class TestLatencySourceFactory implements TimeoutStrategy.LatencySourceFactory { @Override public TimeoutStrategy.LatencySource source(String params) { return percentile -> 10; } } } ``` ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -540,133 +508,234 @@ public IVerbHandler<? extends Reply> responseHandler() return responseHandler; } - public DurabilityScheduling.ImmutableView durabilityScheduling() + public ShardDurability.ImmutableView shardDurability() { - return node.durabilityScheduling().immutableView(); + return node.durability().shards().immutableView(); } - private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> syncPoint) + @Override + public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote); + } + + @Override + public AsyncChain<Void> sync(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + if (keys.size() != 1) + return syncInternal(minBound, keys, syncLocal, syncRemote); + + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + .flatMap(found -> KeyBarriers.await(node, found, syncLocal, syncRemote)) + .flatMap(success -> { + if (success) + return null; + return syncInternal(minBound, keys, syncLocal, syncRemote); + }); + } + + private AsyncChain<Void> syncInternal(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) { - Stopwatch sw = Stopwatch.createStarted(); - keysOrRanges = intersectionWithAccordManagedRanges(keysOrRanges); - // It's possible none of them were Accord managed and we aren't going to treat that as an error - if (keysOrRanges.isEmpty()) + TxnId txnId = node.nextTxnId(minBound, Write, Key, cardinality(keys)); + FullRoute<?> route = node.computeRoute(txnId, keys); + Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.NONE, TxnUpdate.empty()); + return CoordinateTransaction.coordinate(node, route, txnId, txn) + .map(ignore -> (Void)null).beginAsResult(); + } + + @Override + public AsyncChain<Timestamp> maxConflict(Ranges ranges) + { + return node.commandStores().any().build(() -> CoordinateMaxConflict.maxConflict(node, ranges)).flatMap(i -> i); + } + + public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + } + + public static <V> V getBlocking(AsyncChain<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + async.begin(result); + return result.awaitAndGet(); + } + + public static void getBlocking(AsyncChain<Void> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline) + { + getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false); + } + + public static class AccordResult<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>, IAccordService.IAccordResult<V> + { + final @Nullable TxnId txnId; + final Seekables<?, ?> keysOrRanges; + final RequestBookkeeping bookkeeping; + final long startedAtNanos, deadlineAtNanos; + final boolean isTxnRequest; + + public AccordResult(@Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAtNanos, long deadlineAtNanos, boolean isTxnRequest) { - logger.info("Skipping barrier because there are no ranges managed by Accord"); - return keysOrRanges; + this.txnId = txnId; + this.keysOrRanges = keysOrRanges; + this.bookkeeping = bookkeeping; + this.startedAtNanos = startedAtNanos; + this.deadlineAtNanos = deadlineAtNanos; + this.isTxnRequest = isTxnRequest; } - FullRoute<?> route = node.computeRoute(epoch, keysOrRanges); - AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : accordReadMetrics; - try + @Override + public V awaitAndGet() throws RequestExecutionException { - logger.debug("Starting barrier key: {} epoch: {} barrierType: {} isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite); - AsyncResult<TxnId> asyncResult = syncPoint == null - ? Barrier.barrier(node, keysOrRanges, route, epoch, barrierType) - : Barrier.barrier(node, keysOrRanges, route, epoch, barrierType, syncPoint); - long deadlineNanos = requestTime.startedAtNanos() + timeoutNanos; - TxnId txnId = AsyncChains.getBlocking(asyncResult, deadlineNanos - nanoTime(), NANOSECONDS); - if (keysOrRanges.domain() == Key) + try { - PartitionKey key = (PartitionKey)keysOrRanges.get(0); - maybeSaveAccordKeyMigrationLocally(key, Epoch.create(txnId.epoch())); + if (!awaitUntil(deadlineAtNanos)) + accept(null, new Timeout(txnId, null)); } - logger.debug("Completed barrier attempt in {}ms, {}ms since attempts start, barrier key: {} epoch: {} barrierType: {} isForWrite {}", - sw.elapsed(MILLISECONDS), - NANOSECONDS.toMillis(nanoTime() - requestTime.startedAtNanos()), - keysOrRanges, epoch, barrierType, isForWrite); - return keysOrRanges; - } - catch (ExecutionException e) - { - Throwable cause = Throwables.getRootCause(e); - if (cause instanceof Timeout) + catch (InterruptedException e) { - TxnId txnId = ((Timeout) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - metrics.timeouts.mark(); - throw newBarrierTimeout(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + accept(null, e); } - if (cause instanceof Preempted) + + Throwable fail = fail(); + if (fail != null) + throw (RequestExecutionException) fail; + return success(); + } + + @Override + public void accept(V success, Throwable fail) + { + if (fail != null) { - TxnId txnId = ((Preempted) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - //TODO need to improve - // Coordinator "could" query the accord state to see whats going on but that doesn't exist yet. - // Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match - throw newBarrierPreempted(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + RequestExecutionException report; + CoordinationFailed coordinationFailed = findCoordinationFailed(fail); + TxnId txnId = this.txnId; + if (coordinationFailed != null) + { + if (txnId == null && coordinationFailed.txnId() != null) + txnId = coordinationFailed.txnId(); + + if (coordinationFailed instanceof Timeout) + { + report = bookkeeping.newTimeout(txnId, keysOrRanges); + } + else if (coordinationFailed instanceof Preempted) + { + report = bookkeeping.newPreempted(txnId, keysOrRanges); + } + else if (isTxnRequest && coordinationFailed instanceof TopologyMismatch) + { + // Excluding bugs topology mismatch can occur because a table was dropped in between creating the txn + // and executing it. + // It could also race with the table stopping/starting being managed by Accord. + // The caller can retry if the table indeed exists and is managed by Accord. + Set<TableId> txnDroppedTables = txnDroppedTables(keysOrRanges); + Tracing.trace("Accord returned topology mismatch: " + coordinationFailed.getMessage()); + logger.debug("Accord returned topology mismatch", coordinationFailed); + bookkeeping.markTopologyMismatch(); + // Throw IRE in case the caller fails to check if the table still exists + if (!txnDroppedTables.isEmpty()) + { + Tracing.trace("Accord txn uses dropped tables {}", txnDroppedTables); + logger.debug("Accord txn uses dropped tables {}", txnDroppedTables); + throw new InvalidRequestException("Accord transaction uses dropped tables"); + } + trySuccess((V)RetryWithNewProtocolResult.instance); + return; + } + else + { + // this case happens when a non-timeout exception is seen, and we are unable to move forward + if (coordinationFailed instanceof Exhausted) + report = bookkeeping.newExhausted(txnId, keysOrRanges); + else + report = bookkeeping.newFailed(txnId, keysOrRanges); + } + if (txnId != null && txnId.isSyncPoint()) + AccordAgent.onFailedBarrier(txnId, fail); + } + else + { + report = bookkeeping.newFailed(txnId, keysOrRanges); + } + report.addSuppressed(fail); + tryFailure(report); } - if (cause instanceof Exhausted) + else { - TxnId txnId = ((Exhausted) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - // this case happens when a non-timeout exception is seen, and we are unable to move forward - metrics.failures.mark(); - throw newBarrierExhausted(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + if (success == RetryWithNewProtocolResult.instance) + { + bookkeeping.markRetryDifferentSystem(); + Tracing.trace("Got retry different system error from Accord, will retry"); + } + trySuccess(success); } - // unknown error - metrics.failures.mark(); - throw new RuntimeException(cause); + bookkeeping.markElapsedNanos(nanoTime() - startedAtNanos); Review Comment: Do we also want to report latency in the failed case of topology mismatch above? ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -540,133 +508,234 @@ public IVerbHandler<? extends Reply> responseHandler() return responseHandler; } - public DurabilityScheduling.ImmutableView durabilityScheduling() + public ShardDurability.ImmutableView shardDurability() { - return node.durabilityScheduling().immutableView(); + return node.durability().shards().immutableView(); } - private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> syncPoint) + @Override + public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote); + } + + @Override + public AsyncChain<Void> sync(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + if (keys.size() != 1) + return syncInternal(minBound, keys, syncLocal, syncRemote); + + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + .flatMap(found -> KeyBarriers.await(node, found, syncLocal, syncRemote)) + .flatMap(success -> { + if (success) + return null; + return syncInternal(minBound, keys, syncLocal, syncRemote); + }); + } + + private AsyncChain<Void> syncInternal(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) { - Stopwatch sw = Stopwatch.createStarted(); - keysOrRanges = intersectionWithAccordManagedRanges(keysOrRanges); - // It's possible none of them were Accord managed and we aren't going to treat that as an error - if (keysOrRanges.isEmpty()) + TxnId txnId = node.nextTxnId(minBound, Write, Key, cardinality(keys)); + FullRoute<?> route = node.computeRoute(txnId, keys); + Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.NONE, TxnUpdate.empty()); + return CoordinateTransaction.coordinate(node, route, txnId, txn) + .map(ignore -> (Void)null).beginAsResult(); + } + + @Override + public AsyncChain<Timestamp> maxConflict(Ranges ranges) + { + return node.commandStores().any().build(() -> CoordinateMaxConflict.maxConflict(node, ranges)).flatMap(i -> i); + } + + public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + } + + public static <V> V getBlocking(AsyncChain<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + async.begin(result); + return result.awaitAndGet(); + } + + public static void getBlocking(AsyncChain<Void> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline) + { + getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false); + } + + public static class AccordResult<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>, IAccordService.IAccordResult<V> + { + final @Nullable TxnId txnId; + final Seekables<?, ?> keysOrRanges; + final RequestBookkeeping bookkeeping; + final long startedAtNanos, deadlineAtNanos; + final boolean isTxnRequest; + + public AccordResult(@Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAtNanos, long deadlineAtNanos, boolean isTxnRequest) { - logger.info("Skipping barrier because there are no ranges managed by Accord"); - return keysOrRanges; + this.txnId = txnId; + this.keysOrRanges = keysOrRanges; + this.bookkeeping = bookkeeping; + this.startedAtNanos = startedAtNanos; + this.deadlineAtNanos = deadlineAtNanos; + this.isTxnRequest = isTxnRequest; } - FullRoute<?> route = node.computeRoute(epoch, keysOrRanges); - AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : accordReadMetrics; - try + @Override + public V awaitAndGet() throws RequestExecutionException { - logger.debug("Starting barrier key: {} epoch: {} barrierType: {} isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite); - AsyncResult<TxnId> asyncResult = syncPoint == null - ? Barrier.barrier(node, keysOrRanges, route, epoch, barrierType) - : Barrier.barrier(node, keysOrRanges, route, epoch, barrierType, syncPoint); - long deadlineNanos = requestTime.startedAtNanos() + timeoutNanos; - TxnId txnId = AsyncChains.getBlocking(asyncResult, deadlineNanos - nanoTime(), NANOSECONDS); - if (keysOrRanges.domain() == Key) + try { - PartitionKey key = (PartitionKey)keysOrRanges.get(0); - maybeSaveAccordKeyMigrationLocally(key, Epoch.create(txnId.epoch())); + if (!awaitUntil(deadlineAtNanos)) + accept(null, new Timeout(txnId, null)); } - logger.debug("Completed barrier attempt in {}ms, {}ms since attempts start, barrier key: {} epoch: {} barrierType: {} isForWrite {}", - sw.elapsed(MILLISECONDS), - NANOSECONDS.toMillis(nanoTime() - requestTime.startedAtNanos()), - keysOrRanges, epoch, barrierType, isForWrite); - return keysOrRanges; - } - catch (ExecutionException e) - { - Throwable cause = Throwables.getRootCause(e); - if (cause instanceof Timeout) + catch (InterruptedException e) { - TxnId txnId = ((Timeout) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - metrics.timeouts.mark(); - throw newBarrierTimeout(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + accept(null, e); } - if (cause instanceof Preempted) + + Throwable fail = fail(); + if (fail != null) + throw (RequestExecutionException) fail; + return success(); + } + + @Override + public void accept(V success, Throwable fail) + { + if (fail != null) { - TxnId txnId = ((Preempted) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - //TODO need to improve - // Coordinator "could" query the accord state to see whats going on but that doesn't exist yet. - // Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match - throw newBarrierPreempted(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + RequestExecutionException report; + CoordinationFailed coordinationFailed = findCoordinationFailed(fail); + TxnId txnId = this.txnId; + if (coordinationFailed != null) + { + if (txnId == null && coordinationFailed.txnId() != null) + txnId = coordinationFailed.txnId(); + + if (coordinationFailed instanceof Timeout) + { + report = bookkeeping.newTimeout(txnId, keysOrRanges); + } + else if (coordinationFailed instanceof Preempted) + { + report = bookkeeping.newPreempted(txnId, keysOrRanges); + } + else if (isTxnRequest && coordinationFailed instanceof TopologyMismatch) + { + // Excluding bugs topology mismatch can occur because a table was dropped in between creating the txn + // and executing it. + // It could also race with the table stopping/starting being managed by Accord. + // The caller can retry if the table indeed exists and is managed by Accord. + Set<TableId> txnDroppedTables = txnDroppedTables(keysOrRanges); + Tracing.trace("Accord returned topology mismatch: " + coordinationFailed.getMessage()); + logger.debug("Accord returned topology mismatch", coordinationFailed); + bookkeeping.markTopologyMismatch(); + // Throw IRE in case the caller fails to check if the table still exists + if (!txnDroppedTables.isEmpty()) + { + Tracing.trace("Accord txn uses dropped tables {}", txnDroppedTables); + logger.debug("Accord txn uses dropped tables {}", txnDroppedTables); + throw new InvalidRequestException("Accord transaction uses dropped tables"); + } + trySuccess((V)RetryWithNewProtocolResult.instance); + return; + } + else + { + // this case happens when a non-timeout exception is seen, and we are unable to move forward + if (coordinationFailed instanceof Exhausted) Review Comment: nit: why not else-if here? ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -540,133 +508,234 @@ public IVerbHandler<? extends Reply> responseHandler() return responseHandler; } - public DurabilityScheduling.ImmutableView durabilityScheduling() + public ShardDurability.ImmutableView shardDurability() { - return node.durabilityScheduling().immutableView(); + return node.durability().shards().immutableView(); } - private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> syncPoint) + @Override + public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote); + } + + @Override + public AsyncChain<Void> sync(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + if (keys.size() != 1) + return syncInternal(minBound, keys, syncLocal, syncRemote); + + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + .flatMap(found -> KeyBarriers.await(node, found, syncLocal, syncRemote)) + .flatMap(success -> { + if (success) + return null; + return syncInternal(minBound, keys, syncLocal, syncRemote); + }); + } + + private AsyncChain<Void> syncInternal(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) { - Stopwatch sw = Stopwatch.createStarted(); - keysOrRanges = intersectionWithAccordManagedRanges(keysOrRanges); - // It's possible none of them were Accord managed and we aren't going to treat that as an error - if (keysOrRanges.isEmpty()) + TxnId txnId = node.nextTxnId(minBound, Write, Key, cardinality(keys)); + FullRoute<?> route = node.computeRoute(txnId, keys); + Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.NONE, TxnUpdate.empty()); + return CoordinateTransaction.coordinate(node, route, txnId, txn) + .map(ignore -> (Void)null).beginAsResult(); + } + + @Override + public AsyncChain<Timestamp> maxConflict(Ranges ranges) + { + return node.commandStores().any().build(() -> CoordinateMaxConflict.maxConflict(node, ranges)).flatMap(i -> i); + } + + public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + } + + public static <V> V getBlocking(AsyncChain<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + async.begin(result); + return result.awaitAndGet(); + } + + public static void getBlocking(AsyncChain<Void> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline) + { + getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false); + } + + public static class AccordResult<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>, IAccordService.IAccordResult<V> Review Comment: Maybe a good opportunity to move this out of this class? ########## src/java/org/apache/cassandra/net/Verb.java: ########## @@ -316,51 +317,53 @@ public enum Verb ACCORD_NOT_ACCEPT_REQ (124, P2, writeTimeout, IMMEDIATE, () -> AcceptSerializers.notAccept, AccordService::requestHandlerOrNoop, ACCORD_ACCEPT_RSP ), ACCORD_READ_RSP (125, P2, readTimeout, IMMEDIATE, () -> ReadDataSerializers.reply, AccordService::responseHandlerOrNoop ), ACCORD_READ_REQ (126, P2, readTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), - ACCORD_COMMIT_REQ (127, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), - ACCORD_COMMIT_INVALIDATE_REQ (128, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.invalidate, AccordService::requestHandlerOrNoop ), - ACCORD_APPLY_RSP (129, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_APPLY_REQ (130, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.request, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP ), - ACCORD_BEGIN_RECOVER_RSP (131, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_BEGIN_RECOVER_REQ (132, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.request, AccordService::requestHandlerOrNoop, ACCORD_BEGIN_RECOVER_RSP ), - ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_BEGIN_INVALIDATE_REQ (134, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_BEGIN_INVALIDATE_RSP ), - ACCORD_AWAIT_RSP (136, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.syncReply, AccordService::responseHandlerOrNoop ), - ACCORD_AWAIT_REQ (135, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP ), - ACCORD_AWAIT_ASYNC_RSP_REQ (137, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.asyncReply, AccordService::requestHandlerOrNoop ), - ACCORD_WAIT_UNTIL_APPLIED_REQ (138, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.waitUntilApplied, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), - ACCORD_STABLE_THEN_READ_REQ (139, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.stableThenRead, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), - ACCORD_INFORM_DURABLE_REQ (140, P2, writeTimeout, IMMEDIATE, () -> InformDurableSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_CHECK_STATUS_REQ (142, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP ), - ACCORD_RECOVER_AWAIT_RSP (143, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.recoverReply, AccordService::responseHandlerOrNoop ), - ACCORD_RECOVER_AWAIT_REQ (144, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.recoverRequest, AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP), - ACCORD_GET_LATEST_DEPS_RSP (167, P2, writeTimeout, IMMEDIATE, () -> LatestDepsSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_GET_LATEST_DEPS_REQ (168, P2, writeTimeout, IMMEDIATE, () -> LatestDepsSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_LATEST_DEPS_RSP), - ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_EPHMRL_READ_DEPS_RSP), - ACCORD_FETCH_DATA_RSP (145, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_FETCH_DATA_REQ (146, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), - ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, MISC, () -> SetDurableSerializers.shardDurable, AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, MISC, () -> SetDurableSerializers.globallyDurable,AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.reply, AccordService::responseHandlerOrNoop ), - ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.request,AccordService::requestHandlerOrNoop, ACCORD_QUERY_DURABLE_BEFORE_RSP ), - - ACCORD_SYNC_NOTIFY_RSP (151, P2, writeTimeout, MISC, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER), - ACCORD_SYNC_NOTIFY_REQ (152, P2, writeTimeout, MISC, () -> Notification.listSerializer, () -> AccordSyncPropagator.verbHandler, ACCORD_SYNC_NOTIFY_RSP ), - - ACCORD_APPLY_AND_WAIT_REQ (153, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP), - - CONSENSUS_KEY_MIGRATION (154, P1, writeTimeout, MISC, () -> ConsensusKeyMigrationFinished.serializer,() -> ConsensusKeyMigrationState.consensusKeyMigrationFinishedHandler), - - ACCORD_INTEROP_READ_RSP (155, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.replySerializer, AccordService::responseHandlerOrNoop), - ACCORD_INTEROP_READ_REQ (156, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_RSP), - ACCORD_INTEROP_STABLE_THEN_READ_REQ(157, P2, writeTimeout, IMMEDIATE, () -> AccordInteropStableThenRead.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_RSP), - ACCORD_INTEROP_READ_REPAIR_RSP (158, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.replySerializer, AccordService::responseHandlerOrNoop), - ACCORD_INTEROP_READ_REPAIR_REQ (159, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP), - ACCORD_INTEROP_APPLY_REQ (160, P2, writeTimeout, IMMEDIATE, () -> AccordInteropApply.serializer, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP), + ACCORD_STABLE_THEN_READ_REQ (127, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.stableThenRead, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), + ACCORD_COMMIT_REQ (128, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), + ACCORD_COMMIT_INVALIDATE_REQ (129, P2, writeTimeout, IMMEDIATE, () -> CommitSerializers.invalidate, AccordService::requestHandlerOrNoop ), + ACCORD_APPLY_RSP (130, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_APPLY_REQ (131, P2, writeTimeout, IMMEDIATE, () -> ApplySerializers.request, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP ), + ACCORD_APPLY_AND_WAIT_REQ (132, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP), + ACCORD_BEGIN_RECOVER_RSP (133, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_BEGIN_RECOVER_REQ (134, P2, writeTimeout, IMMEDIATE, () -> RecoverySerializers.request, AccordService::requestHandlerOrNoop, ACCORD_BEGIN_RECOVER_RSP ), + ACCORD_BEGIN_INVALIDATE_RSP (135, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_BEGIN_INVALIDATE_REQ (136, P2, writeTimeout, IMMEDIATE, () -> BeginInvalidationSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_BEGIN_INVALIDATE_RSP ), + ACCORD_AWAIT_RSP (137, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.syncReply, AccordService::responseHandlerOrNoop ), + ACCORD_AWAIT_REQ (138, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP ), + ACCORD_AWAIT_ASYNC_RSP_REQ (139, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.asyncReply, AccordService::requestHandlerOrNoop ), + ACCORD_WAIT_UNTIL_APPLIED_REQ (140, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.waitUntilApplied, AccordService::requestHandlerOrNoop, ACCORD_READ_RSP ), + ACCORD_RECOVER_AWAIT_RSP (141, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.recoverReply, AccordService::responseHandlerOrNoop ), + ACCORD_RECOVER_AWAIT_REQ (142, P2, writeTimeout, IMMEDIATE, () -> AwaitSerializers.recoverRequest, AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP), + ACCORD_INFORM_DURABLE_REQ (143, P2, writeTimeout, IMMEDIATE, () -> InformDurableSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), + ACCORD_CHECK_STATUS_RSP (144, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_CHECK_STATUS_REQ (145, P2, writeTimeout, IMMEDIATE, () -> CheckStatusSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP ), + ACCORD_FETCH_DATA_RSP (146, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_FETCH_DATA_REQ (147, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), + ACCORD_GET_EPHMRL_READ_DEPS_RSP (148, P2, readTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_GET_EPHMRL_READ_DEPS_REQ (149, P2, readTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_EPHMRL_READ_DEPS_RSP), + ACCORD_GET_LATEST_DEPS_RSP (150, P2, readTimeout, IMMEDIATE, () -> LatestDepsSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_GET_LATEST_DEPS_REQ (151, P2, readTimeout, IMMEDIATE, () -> LatestDepsSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_LATEST_DEPS_RSP), + ACCORD_GET_MAX_CONFLICT_RSP (152, P2, readTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_GET_MAX_CONFLICT_REQ (153, P2, readTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_MAX_CONFLICT_RSP), + ACCORD_GET_DURABLE_BEFORE_RSP (154, P2, readTimeout, IMMEDIATE, () -> GetDurableBeforeSerializers.reply, AccordService::responseHandlerOrNoop ), + ACCORD_GET_DURABLE_BEFORE_REQ (155, P2, readTimeout, IMMEDIATE, () -> GetDurableBeforeSerializers.request, AccordService::requestHandlerOrNoop, ACCORD_GET_DURABLE_BEFORE_RSP ), + ACCORD_SET_SHARD_DURABLE_REQ (156, P2, rpcTimeout, MISC, () -> SetDurableSerializers.shardDurable, AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), + ACCORD_SET_GLOBALLY_DURABLE_REQ (157, P2, rpcTimeout, MISC, () -> SetDurableSerializers.globallyDurable,AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP ), + + ACCORD_SYNC_NOTIFY_RSP (158, P2, writeTimeout, MISC, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER), + ACCORD_SYNC_NOTIFY_REQ (159, P2, writeTimeout, MISC, () -> Notification.listSerializer, () -> AccordSyncPropagator.verbHandler, ACCORD_SYNC_NOTIFY_RSP ), + + + CONSENSUS_KEY_MIGRATION (160, P1, writeTimeout, MISC, () -> ConsensusKeyMigrationFinished.serializer,() -> ConsensusKeyMigrationState.consensusKeyMigrationFinishedHandler), + + ACCORD_INTEROP_READ_RSP (161, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.replySerializer, AccordService::responseHandlerOrNoop), + ACCORD_INTEROP_READ_REQ (162, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_RSP), + ACCORD_INTEROP_STABLE_THEN_READ_REQ(163, P2, writeTimeout, IMMEDIATE, () -> AccordInteropStableThenRead.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_RSP), + ACCORD_INTEROP_READ_REPAIR_RSP (164, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.replySerializer, AccordService::responseHandlerOrNoop), + ACCORD_INTEROP_READ_REPAIR_REQ (165, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP), + ACCORD_INTEROP_APPLY_REQ (166, P2, writeTimeout, IMMEDIATE, () -> AccordInteropApply.serializer, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP), // TODO (desired): swap verb order to make IDS sequential? - ACCORD_FETCH_MIN_EPOCH_RSP (166, P0, shortTimeout, FETCH_METADATA, () -> FetchMinEpoch.Response.serializer, RESPONSE_HANDLER), Review Comment: TODO above can now go away since IDs are now in the right order ########## src/java/org/apache/cassandra/service/accord/api/AccordAgent.java: ########## @@ -244,73 +258,95 @@ public static long nonClashingStartTime(long startTime, SortedList<Node.Id> node } @Override - public long seekProgressDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil blockedUntil, TimeUnit units) + public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int attempt, BlockedUntil blockedUntil, TimeUnit units) { - // TODO (required): make this configurable and dependent upon normal request latencies, and perhaps offset from txnId.hlc() - return units.convert((1L << Math.min(retryCount, 4)), SECONDS); + return fetch(txnId).computeWait(attempt, units); } @Override - public long retryAwaitTimeout(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil retrying, TimeUnit units) + public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int attempt, BlockedUntil retrying, TimeUnit units) { - // TODO (expected): integrate with contention backoff - return units.convert((1L << Math.min(retryCount, 4)), SECONDS); + // TODO (desired): separate config? + return fetch(txnId).computeWait(attempt, units); } @Override - public long localSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit) + public long retrySyncPointDelay(Node node, int attempt, TimeUnit units) { - switch (phase) - { - default: throw new UnhandledEnum(phase); - case PreAccept: return unit.convert(slowPreaccept().computeWaitUntil(1), NANOSECONDS); - case Execute: return unit.convert(slowRead().computeWaitUntil(1), NANOSECONDS); - } + return retrySyncPoint.computeWait(attempt, units); } @Override - public long localExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit) + public long retryDurabilityDelay(Node node, int attempt, TimeUnit units) { - // TODO (expected): make this configurable - return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit) - : DatabaseDescriptor.getReadRpcTimeout(unit); + return retryDurability.computeWait(attempt, units); } @Override - public long expiresAt(ReplyContext replyContext, TimeUnit unit) + public long expireEpochWait(TimeUnit units) { - return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), NANOSECONDS); + return expireEpochWait.computeWait(1, units); } @Override - public void onViolation(String message, Participants<?> participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt) + public long expiresAt(ReplyContext replyContext, TimeUnit unit) { - logger.error(message); + return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), NANOSECONDS); } - public TimeoutStrategy slowRead() + @Override + public long selfSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit) { - if (slowRead == null) + switch (phase) { - synchronized (this) - { - AccordSpec config = DatabaseDescriptor.getAccord(); - slowRead = new TimeoutStrategy(config.slowRead, TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics)); - } + default: throw new UnhandledEnum(phase); + case PreAccept: return unit.convert(slowTxnPreaccept.computeWaitUntil(1), NANOSECONDS); + case Execute: return unit.convert(slowRead.computeWaitUntil(1), NANOSECONDS); } - return slowRead; } - public TimeoutStrategy slowPreaccept() + @Override + public long selfExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit) { - if (slowPreaccept == null) + long delayNanos; + switch (txnId.kind()) { - synchronized (this) - { - AccordSpec config = DatabaseDescriptor.getAccord(); - slowPreaccept = new TimeoutStrategy(config.slowPreAccept, TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics)); - } + default: throw new UnhandledEnum(txnId.kind()); + case Write: + delayNanos = DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS); + break; + case EphemeralRead: + case Read: + delayNanos = DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS); + break; + case ExclusiveSyncPoint: + delayNanos = DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(); } - return slowPreaccept; + return unit.convert(nanoTime() + delayNanos, NANOSECONDS); + } + + @Override + public AsyncChain<TxnId> awaitStaleId(Node node, TxnId staleId, boolean isRequested) + { + long waitMicros = (staleId.hlc() + getAccordScheduleDurabilityTxnIdLag(MICROSECONDS)) - node.now(); + if (waitMicros <= 0) + return AsyncChains.success(staleId); + + logger.debug("Waiting {} micros for {} to be stale", waitMicros, staleId); + AsyncResult.Settable<TxnId> result = AsyncResults.settable(); + node.scheduler().selfRecurring(() -> result.setSuccess(staleId), waitMicros, MICROSECONDS); + return result; + } + + @Override + public long minStaleHlc(Node node, boolean requested) + { + return node.now() - (100 + getAccordScheduleDurabilityTxnIdLag(MICROSECONDS)); Review Comment: Is 100 chosen here arbitrarily? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org