belliottsmith commented on code in PR #3943:
URL: https://github.com/apache/cassandra/pull/3943#discussion_r1978891064


##########
src/java/org/apache/cassandra/service/accord/api/AccordAgent.java:
##########
@@ -244,73 +258,95 @@ public static long nonClashingStartTime(long startTime, 
SortedList<Node.Id> node
     }
 
     @Override
-    public long seekProgressDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, BlockedUntil blockedUntil, TimeUnit units)
+    public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int attempt, BlockedUntil blockedUntil, TimeUnit units)
     {
-        // TODO (required): make this configurable and dependent upon normal 
request latencies, and perhaps offset from txnId.hlc()
-        return units.convert((1L << Math.min(retryCount, 4)), SECONDS);
+        return fetch(txnId).computeWait(attempt, units);
     }
 
     @Override
-    public long retryAwaitTimeout(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, BlockedUntil retrying, TimeUnit units)
+    public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int attempt, BlockedUntil retrying, TimeUnit units)
     {
-        // TODO (expected): integrate with contention backoff
-        return units.convert((1L << Math.min(retryCount, 4)), SECONDS);
+        // TODO (desired): separate config?
+        return fetch(txnId).computeWait(attempt, units);
     }
 
     @Override
-    public long localSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
+    public long retrySyncPointDelay(Node node, int attempt, TimeUnit units)
     {
-        switch (phase)
-        {
-            default: throw new UnhandledEnum(phase);
-            case PreAccept: return 
unit.convert(slowPreaccept().computeWaitUntil(1), NANOSECONDS);
-            case Execute:   return 
unit.convert(slowRead().computeWaitUntil(1), NANOSECONDS);
-        }
+        return retrySyncPoint.computeWait(attempt, units);
     }
 
     @Override
-    public long localExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
+    public long retryDurabilityDelay(Node node, int attempt, TimeUnit units)
     {
-        // TODO (expected): make this configurable
-        return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit)
-                               : DatabaseDescriptor.getReadRpcTimeout(unit);
+        return retryDurability.computeWait(attempt, units);
     }
 
     @Override
-    public long expiresAt(ReplyContext replyContext, TimeUnit unit)
+    public long expireEpochWait(TimeUnit units)
     {
-        return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), 
NANOSECONDS);
+        return expireEpochWait.computeWait(1, units);
     }
 
     @Override
-    public void onViolation(String message, Participants<?> participants, 
@Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, 
@Nullable TxnId by, @Nullable Timestamp byEexecuteAt)
+    public long expiresAt(ReplyContext replyContext, TimeUnit unit)
     {
-        logger.error(message);
+        return unit.convert(((ResponseContext)replyContext).expiresAtNanos(), 
NANOSECONDS);
     }
 
-    public TimeoutStrategy slowRead()
+    @Override
+    public long selfSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
     {
-        if (slowRead == null)
+        switch (phase)
         {
-            synchronized (this)
-            {
-                AccordSpec config = DatabaseDescriptor.getAccord();
-                slowRead = new TimeoutStrategy(config.slowRead, 
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
-            }
+            default: throw new UnhandledEnum(phase);
+            case PreAccept: return 
unit.convert(slowTxnPreaccept.computeWaitUntil(1), NANOSECONDS);
+            case Execute:   return unit.convert(slowRead.computeWaitUntil(1), 
NANOSECONDS);
         }
-        return slowRead;
     }
 
-    public TimeoutStrategy slowPreaccept()
+    @Override
+    public long selfExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
     {
-        if (slowPreaccept == null)
+        long delayNanos;
+        switch (txnId.kind())
         {
-            synchronized (this)
-            {
-                AccordSpec config = DatabaseDescriptor.getAccord();
-                slowPreaccept = new TimeoutStrategy(config.slowPreAccept, 
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
-            }
+            default: throw new UnhandledEnum(txnId.kind());
+            case Write:
+                delayNanos = 
DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS);
+                break;
+            case EphemeralRead:
+            case Read:
+                delayNanos = DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS);
+                break;
+            case ExclusiveSyncPoint:
+                delayNanos = 
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos();
         }
-        return slowPreaccept;
+        return unit.convert(nanoTime() + delayNanos, NANOSECONDS);
+    }
+
+    @Override
+    public AsyncChain<TxnId> awaitStaleId(Node node, TxnId staleId, boolean 
isRequested)
+    {
+        long waitMicros = (staleId.hlc() + 
getAccordScheduleDurabilityTxnIdLag(MICROSECONDS)) - node.now();
+        if (waitMicros <= 0)
+            return AsyncChains.success(staleId);
+
+        logger.debug("Waiting {} micros for {} to be stale", waitMicros, 
staleId);
+        AsyncResult.Settable<TxnId> result = AsyncResults.settable();
+        node.scheduler().selfRecurring(() -> result.setSuccess(staleId), 
waitMicros, MICROSECONDS);
+        return result;
+    }
+
+    @Override
+    public long minStaleHlc(Node node, boolean requested)
+    {
+        return node.now() - (100 + 
getAccordScheduleDurabilityTxnIdLag(MICROSECONDS));

Review Comment:
   yes, just some random extra time to mean we don't allocate something always 
in the future. I guess really it's a bad choice, I didn't think about it much, 
but if we currently allocate only 1 id every 1000 microseconds this won't 
actually stop that happening very often.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to