belliottsmith commented on code in PR #3943: URL: https://github.com/apache/cassandra/pull/3943#discussion_r1978891064
########## src/java/org/apache/cassandra/service/accord/api/AccordAgent.java: ########## @@ -244,73 +258,95 @@ public static long nonClashingStartTime(long startTime, SortedList<Node.Id> node } @Override - public long seekProgressDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil blockedUntil, TimeUnit units) + public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int attempt, BlockedUntil blockedUntil, TimeUnit units) { - // TODO (required): make this configurable and dependent upon normal request latencies, and perhaps offset from txnId.hlc() - return units.convert((1L << Math.min(retryCount, 4)), SECONDS); + return fetch(txnId).computeWait(attempt, units); } @Override - public long retryAwaitTimeout(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil retrying, TimeUnit units) + public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int attempt, BlockedUntil retrying, TimeUnit units) { - // TODO (expected): integrate with contention backoff - return units.convert((1L << Math.min(retryCount, 4)), SECONDS); + // TODO (desired): separate config? + return fetch(txnId).computeWait(attempt, units); } @Override - public long localSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit) + public long retrySyncPointDelay(Node node, int attempt, TimeUnit units) { - switch (phase) - { - default: throw new UnhandledEnum(phase); - case PreAccept: return unit.convert(slowPreaccept().computeWaitUntil(1), NANOSECONDS); - case Execute: return unit.convert(slowRead().computeWaitUntil(1), NANOSECONDS); - } + return retrySyncPoint.computeWait(attempt, units); } @Override - public long localExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit) + public long retryDurabilityDelay(Node node, int attempt, TimeUnit units) { - // TODO (expected): make this configurable - return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit) - : DatabaseDescriptor.getReadRpcTimeout(unit); + return retryDurability.computeWait(attempt, units); } @Override - public long expiresAt(ReplyContext replyContext, TimeUnit unit) + public long expireEpochWait(TimeUnit units) { - return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), NANOSECONDS); + return expireEpochWait.computeWait(1, units); } @Override - public void onViolation(String message, Participants<?> participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt) + public long expiresAt(ReplyContext replyContext, TimeUnit unit) { - logger.error(message); + return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), NANOSECONDS); } - public TimeoutStrategy slowRead() + @Override + public long selfSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit) { - if (slowRead == null) + switch (phase) { - synchronized (this) - { - AccordSpec config = DatabaseDescriptor.getAccord(); - slowRead = new TimeoutStrategy(config.slowRead, TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics)); - } + default: throw new UnhandledEnum(phase); + case PreAccept: return unit.convert(slowTxnPreaccept.computeWaitUntil(1), NANOSECONDS); + case Execute: return unit.convert(slowRead.computeWaitUntil(1), NANOSECONDS); } - return slowRead; } - public TimeoutStrategy slowPreaccept() + @Override + public long selfExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit) { - if (slowPreaccept == null) + long delayNanos; + switch (txnId.kind()) { - synchronized (this) - { - AccordSpec config = DatabaseDescriptor.getAccord(); - slowPreaccept = new TimeoutStrategy(config.slowPreAccept, TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics)); - } + default: throw new UnhandledEnum(txnId.kind()); + case Write: + delayNanos = DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS); + break; + case EphemeralRead: + case Read: + delayNanos = DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS); + break; + case ExclusiveSyncPoint: + delayNanos = DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(); } - return slowPreaccept; + return unit.convert(nanoTime() + delayNanos, NANOSECONDS); + } + + @Override + public AsyncChain<TxnId> awaitStaleId(Node node, TxnId staleId, boolean isRequested) + { + long waitMicros = (staleId.hlc() + getAccordScheduleDurabilityTxnIdLag(MICROSECONDS)) - node.now(); + if (waitMicros <= 0) + return AsyncChains.success(staleId); + + logger.debug("Waiting {} micros for {} to be stale", waitMicros, staleId); + AsyncResult.Settable<TxnId> result = AsyncResults.settable(); + node.scheduler().selfRecurring(() -> result.setSuccess(staleId), waitMicros, MICROSECONDS); + return result; + } + + @Override + public long minStaleHlc(Node node, boolean requested) + { + return node.now() - (100 + getAccordScheduleDurabilityTxnIdLag(MICROSECONDS)); Review Comment: yes, just some random extra time to mean we don't allocate something always in the future. I guess really it's a bad choice, I didn't think about it much, but if we currently allocate only 1 id every 1000 microseconds this won't actually stop that happening very often. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org