ifesdjeen commented on code in PR #214:
URL: https://github.com/apache/cassandra-accord/pull/214#discussion_r2175595366


##########
accord-core/src/main/java/accord/local/durability/DurabilityService.java:
##########
@@ -73,64 +74,97 @@ public GlobalDurability global()
         return global;
     }
 
-    public synchronized void start()
+    public void start()
     {
-        Invariants.require(!started);
-        started = true;
+        synchronized (this)
+        {
+            Invariants.require(!started);
+            started = true;
+        }
         Topology current = node.topology().current();
         shards.updateTopology(current);
         global.updateTopology(current);
         shards.start();
         global.start();
     }
 
-    public synchronized void stop()
+    public void stop()
     {
         shards.stop();
         global.stop();
     }
 
-    public AsyncResult<Void> close(String requestedBy, Ranges ranges)
+    public AsyncResult<Void> close(String requestedBy, Ranges ranges, long 
timeoutDelay, TimeUnit timeoutUnits)
     {
-        return close(requestedBy, TxnId.NONE, ranges);
+        return close(requestedBy, TxnId.NONE, ranges, timeoutDelay, 
timeoutUnits);
     }
 
-    public AsyncResult<Void> close(Object requestedBy, Timestamp minBound, 
Ranges ranges)
+    public AsyncResult<Void> close(Object requestedBy, Timestamp minBound, 
Ranges ranges, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
SyncLocal.NoLocal, SyncRemote.NoRemote, null, 
node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
SyncLocal.NoLocal, SyncRemote.NoRemote, null, startedAt, timeoutAt)).result;
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal 
local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal 
local, SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return sync(requestedBy, TxnId.NONE, ranges, local, remote);
+        return sync(requestedBy, TxnId.NONE, ranges, local, remote, 
timeoutDelay, timeoutUnits);
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, SyncLocal local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, SyncLocal local, SyncRemote remote, long timeoutDelay, TimeUnit 
timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, null, node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, null, startedAt, timeoutAt)).result;
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable 
Collection<Node.Id> include, SyncLocal local, SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable 
Collection<Node.Id> include, SyncLocal local, SyncRemote remote, long 
timeoutDelay, TimeUnit timeoutUnits)
     {
-        return sync(requestedBy, TxnId.NONE, ranges, include, local, remote);
+        return sync(requestedBy, TxnId.NONE, ranges, include, local, remote, 
timeoutDelay, timeoutUnits);
     }
 
-    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local, 
SyncRemote remote)
+    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local, 
SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
     {
-        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, include, node.elapsed(MICROSECONDS))).result;
+        long startedAt = node.elapsed(MICROSECONDS);
+        long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+        return submit(new DurabilityRequest(requestedBy, minBound, ranges, 
local, remote, include, startedAt, timeoutAt)).result;
     }
 
     private DurabilityRequest submit(DurabilityRequest request)
     {
-        synchronized (this)
-        {
-            requests.add(request);
-        }
+        register(request);
         logger.info("Requesting durability {}", request);
         shards.request(request);
         return request;
     }
 
+    void register(DurabilityRequest request)
+    {
+        request.timeout = node.timeouts().registerAt(new Timeout()
+        {
+            @Override public int stripe() { return request.ranges.hashCode(); }
+            @Override public void timeout()
+            {
+                request.timeout();
+                unregister(request);
+            }
+        }, request.timeoutAt, MICROSECONDS);
+
+        synchronized (this)
+        {
+            if (!request.isDone()) // guard against unlikely scenario of 
timeout firing before we register here

Review Comment:
   If my understanding is correct, could still get triggered between isDone 
check and adding to request, should we check _after_ adding and unregistering 
then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to