dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103101457


##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the 
cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. 
({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> 
referenceAndDispatchReads(Iterable<K> keys,
                                                                                
            AccordStateCache.Instance<K, V> cache,
-                                                                               
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
+                                                                               
            LoadFunction<K, V> loadFunction,
+                                                                               
            List<AsyncChain<Void>> results,
                                                                                
            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = cache.referenceAndLoad(key, 
loadFunction);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    LoadFunction<TxnId, Command> loadCommandFunction(Object callback)

Review Comment:
   I don't see it, could you explain the case?  If we don't have the tx in 
memory we need to fetch it from somewhere before we can process, which 
currently requires we read from the system table.  If we limit how many we can 
do do we not stall the transactions that depend on this one?  The load function 
only comes into play when the data isn't in-memory, so any operation that has 
all tx state in-memory can make progress while the ones that need to fetch are 
required to reschedule after that load.
   
   If the concern is about reusing the read stage because others may be using 
it, then I am cool having a new `METADATA_READ` stage which separates internal 
reads from user reads
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to