ifesdjeen commented on code in PR #4264:
URL: https://github.com/apache/cassandra/pull/4264#discussion_r2222935359


##########
src/java/org/apache/cassandra/service/accord/AccordDataStore.java:
##########
@@ -18,44 +18,27 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.DataStore;
+import accord.local.CommandStore;
 import accord.local.Node;
+import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
-import accord.primitives.TxnId;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
-import org.agrona.collections.Object2ObjectHashMap;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.memtable.Memtable;
-import org.apache.cassandra.db.memtable.TrieMemtable;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
-
-import static accord.utils.Invariants.require;
-import static 
org.apache.cassandra.db.ColumnFamilyStore.FlushReason.ACCORD_TXN_GC;
 
 public class AccordDataStore implements DataStore
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordDataStore.class);

Review Comment:
   Could you briefly document durability semantics here? In other words, the 
fact that sync point is executed _after_ we have flushed meltable?



##########
src/java/org/apache/cassandra/utils/btree/IntervalBTree.java:
##########
@@ -492,6 +492,8 @@ void initParent()
         @Override
         public void close()
         {
+            // TODO (required): validate this in IntervalBTreeTest

Review Comment:
   🫡 



##########
src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java:
##########
@@ -147,6 +152,29 @@ public LifecycleTransaction 
setFlushTransaction(LifecycleTransaction flushTransa
         return this.flushTransaction.getAndSet(flushTransaction);
     }
 
+    @Override
+    public synchronized <T extends Consumer<TableMetadata>> T 
ensureFlushListener(Object key, Supplier<T> factory)
+    {
+        if (onFlush == null)
+            return null;
+
+        T listener;
+        if (null == (listener = (T)onFlush.get(key)))
+            onFlush = ImmutableMap.<Object, 
Consumer<TableMetadata>>builder().putAll(onFlush).put(key, listener = 
factory.get()).build();

Review Comment:
   Would you mind if we not used assignments inside statements (it is just so 
easy to miss them), and did something like this:
   
   ```
         T listener = (T) onFlush.get(key);
         if (listener == null)
         {
             listener = factory.get();
             onFlush = ImmutableMap.<Object, Consumer<TableMetadata>>builder()
                                  .putAll(onFlush)
                                  .put(key, listener)
                                  .build();
         }
         return listener;
   ```



##########
src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java:
##########
@@ -78,35 +68,42 @@ public boolean hasTasks()
         return tasks + executor.getActiveTaskCount() + 
executor.getPendingTaskCount() > 0;
     }
 
+    @Override
+    void preUnlockCaches()

Review Comment:
   nit: maybe "before" instead of "pre"?



##########
src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java:
##########
@@ -288,71 +319,34 @@ void notifyListeners(BiConsumer<AccordCache.Listener<K, 
V>, AccordCacheEntry<K,
         owner.notifyListeners(notify, this);
     }
 
-    public interface OnLoaded
+    public interface LoadExecutor<P1, P2>
     {
-        <K, V> void onLoaded(AccordCacheEntry<K, V> state, V value, Throwable 
fail);
+        <K, V> Cancellable load(P1 p1, P2 p2, AccordCacheEntry<K, V> entry);
+    }
 
-        static OnLoaded immediate()
+    // functions as both an identity object, and a register of listeners
+    public static class UniqueSave
+    {
+        @Nullable List<Runnable> onSuccess;
+        void onSuccess(Runnable onSuccess)

Review Comment:
   It looks like onSuccess can be called from multiple threads (via shared 
executor submission), should we use volatile and some concurrent structure here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to