belliottsmith commented on code in PR #214:
URL: https://github.com/apache/cassandra-accord/pull/214#discussion_r2175635330


##########
accord-core/src/main/java/accord/local/durability/DurabilityService.java:
##########
@@ -73,64 +74,97 @@ public GlobalDurability global()
         return global;
     }
 
-    public synchronized void start()
+    public void start()
     {
-        Invariants.require(!started);
-        started = true;
+        synchronized (this)
+        {
+            Invariants.require(!started);
+            started = true;
+        }
         Topology current = node.topology().current();
         shards.updateTopology(current);
         global.updateTopology(current);
         shards.start();
         global.start();
     }
 
-    public synchronized void stop()
+    public void stop()
     {
         shards.stop();
         global.stop();
     }
 
-    public AsyncResult<Void> close(String requestedBy, Ranges ranges)
+    public AsyncResult<Void> close(String requestedBy, Ranges ranges, long 
timeoutDelay, TimeUnit timeoutUnits)
     {
-        return close(requestedBy, TxnId.NONE, ranges);
+        return close(requestedBy, TxnId.NONE, ranges, timeoutDelay, 
timeoutUnits);
     }
 
-    public AsyncResult<Void> close(Object requestedBy, Timestamp minBound, 
Ranges ranges)
+    public AsyncResult<Void> close(Object requestedBy, Timestamp minBound, 
Ranges ranges, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
SyncLocal.NoLocal, SyncRemote.NoRemote, null, 
node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
SyncLocal.NoLocal, SyncRemote.NoRemote, null, startedAt, timeoutAt)).result;
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal 
local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal 
local, SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return sync(requestedBy, TxnId.NONE, ranges, local, remote);
+        return sync(requestedBy, TxnId.NONE, ranges, local, remote, 
timeoutDelay, timeoutUnits);
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, SyncLocal local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, SyncLocal local, SyncRemote remote, long timeoutDelay, TimeUnit 
timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, null, node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, null, startedAt, timeoutAt)).result;
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable 
Collection<Node.Id> include, SyncLocal local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable 
Collection<Node.Id> include, SyncLocal local, SyncRemote remote, long 
timeoutDelay, TimeUnit timeoutUnits)
     {
-        return sync(requestedBy, TxnId.NONE, ranges, include, local, remote);
+        return sync(requestedBy, TxnId.NONE, ranges, include, local, remote, 
timeoutDelay, timeoutUnits);
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local, 
SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local, 
SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, include, node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, include, startedAt, timeoutAt)).result;
     }
 
     private DurabilityRequest submit(DurabilityRequest request)
     {
-        synchronized (this)
-        {
-            requests.add(request);
-        }
+        register(request);
         logger.info("Requesting durability {}", request);
         shards.request(request);
         return request;
     }
 
+    void register(DurabilityRequest request)
+    {
+        request.timeout = node.timeouts().registerAt(new Timeout()
+        {
+            @Override public int stripe() { return request.ranges.hashCode(); }
+            @Override public void timeout()
+            {
+                request.timeout();
+                unregister(request);
+            }
+        }, request.timeoutAt, MICROSECONDS);
+
+        synchronized (this)
+        {
+            if (!request.isDone()) // guard against unlikely scenario of 
timeout firing before we register here

Review Comment:
   Since the timeout is added first, it might be that the request gets timedout 
before we register it. If the timeout is invoked _after_ we register then it 
will successfully remove us, so nothing to worry about. But if it has already 
fired it won't have removed us (because we hadn't been added yet)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to