ifesdjeen commented on code in PR #4264: URL: https://github.com/apache/cassandra/pull/4264#discussion_r2222935359
########## src/java/org/apache/cassandra/service/accord/AccordDataStore.java: ########## @@ -18,44 +18,27 @@ package org.apache.cassandra.service.accord; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.api.DataStore; +import accord.local.CommandStore; import accord.local.Node; +import accord.local.RedundantBefore; import accord.local.SafeCommandStore; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.SyncPoint; -import accord.primitives.TxnId; -import accord.utils.async.AsyncResult; -import accord.utils.async.AsyncResults; -import org.agrona.collections.Object2ObjectHashMap; -import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.memtable.Memtable; -import org.apache.cassandra.db.memtable.TrieMemtable; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; - -import static accord.utils.Invariants.require; -import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.ACCORD_TXN_GC; public class AccordDataStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class); Review Comment: Could you briefly document durability semantics here? In other words, the fact that sync point is executed _after_ we have flushed meltable? ########## src/java/org/apache/cassandra/utils/btree/IntervalBTree.java: ########## @@ -492,6 +492,8 @@ void initParent() @Override public void close() { + // TODO (required): validate this in IntervalBTreeTest Review Comment: 🫡 ########## src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java: ########## @@ -147,6 +152,29 @@ public LifecycleTransaction setFlushTransaction(LifecycleTransaction flushTransa return this.flushTransaction.getAndSet(flushTransaction); } + @Override + public synchronized <T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> factory) + { + if (onFlush == null) + return null; + + T listener; + if (null == (listener = (T)onFlush.get(key))) + onFlush = ImmutableMap.<Object, Consumer<TableMetadata>>builder().putAll(onFlush).put(key, listener = factory.get()).build(); Review Comment: Would you mind if we not used assignments inside statements (it is just so easy to miss them), and did something like this: ``` T listener = (T) onFlush.get(key); if (listener == null) { listener = factory.get(); onFlush = ImmutableMap.<Object, Consumer<TableMetadata>>builder() .putAll(onFlush) .put(key, listener) .build(); } return listener; ``` ########## src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java: ########## @@ -78,35 +68,42 @@ public boolean hasTasks() return tasks + executor.getActiveTaskCount() + executor.getPendingTaskCount() > 0; } + @Override + void preUnlockCaches() Review Comment: nit: maybe "before" instead of "pre"? ########## src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java: ########## @@ -288,71 +319,34 @@ void notifyListeners(BiConsumer<AccordCache.Listener<K, V>, AccordCacheEntry<K, owner.notifyListeners(notify, this); } - public interface OnLoaded + public interface LoadExecutor<P1, P2> { - <K, V> void onLoaded(AccordCacheEntry<K, V> state, V value, Throwable fail); + <K, V> Cancellable load(P1 p1, P2 p2, AccordCacheEntry<K, V> entry); + } - static OnLoaded immediate() + // functions as both an identity object, and a register of listeners + public static class UniqueSave + { + @Nullable List<Runnable> onSuccess; + void onSuccess(Runnable onSuccess) Review Comment: It looks like onSuccess can be called from multiple threads (via shared executor submission), should we use volatile and some concurrent structure here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org