dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1104994131


##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the 
cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. 
({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> 
referenceAndDispatchReads(Iterable<K> keys,
                                                                                
            AccordStateCache.Instance<K, V> cache,
-                                                                               
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
+                                                                               
            LoadFunction<K, V> loadFunction,
+                                                                               
            List<AsyncChain<Void>> results,
                                                                                
            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = cache.referenceAndLoad(key, 
loadFunction);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    LoadFunction<TxnId, Command> loadCommandFunction(Object callback)
     {
-        return command -> Stage.READ.submit(() -> {
+        return (txnId, consumer) -> ofRunnable(Stage.READ.executor(), () -> {
             try
             {
-                logger.trace("Starting load of {} for {}", command.txnId(), 
callback);
-                AccordKeyspace.loadCommand(commandStore, command);
-                logger.trace("Completed load of {} for {}", command.txnId(), 
callback);
+                logger.trace("Starting load of {} for {}", txnId, callback);
+                Command command = AccordKeyspace.loadCommand(commandStore, 
txnId);
+                logger.trace("Completed load of {} for {}", txnId, callback);
+                consumer.accept(command);
             }
             catch (Throwable t)
             {
-                logger.error("Exception loading {} for {}", command.txnId(), 
callback, t);
+                logger.error("Exception loading {} for {}", txnId, callback, 
t);
                 throw t;
             }
         });
     }
 
     @VisibleForTesting
-    Function<AccordCommandsForKey, Future<?>> 
loadCommandsPerKeyFunction(Object callback)
+    LoadFunction<RoutableKey, CommandsForKey> 
loadCommandsPerKeyFunction(Object callback)
     {
-        return cfk -> Stage.READ.submit(() -> {
+        return (key, consumer) -> ofRunnable(Stage.READ.executor(), () -> {
             try
             {
-                logger.trace("Starting load of {} for {}", cfk.key(), 
callback);
-                AccordKeyspace.loadCommandsForKey(cfk);
-                logger.trace("Completed load of {} for {}", cfk.key(), 
callback);
+                logger.trace("Starting load of {} for {}", key, callback);
+                CommandsForKey cfk = 
AccordKeyspace.loadCommandsForKey(commandStore, (PartitionKey) key);
+                logger.trace("Completed load of {} for {}", key, callback);
+                consumer.accept(cfk);
             }
             catch (Throwable t)
             {
-                logger.error("Exception loading {} for {}", cfk.key(), 
callback, t);
+                logger.error("Exception loading {} for {}", key, callback, t);
                 throw t;
             }
         });
     }
 
-    private Future<?> referenceAndDispatchReads(AsyncContext context, Object 
callback)
+    private AsyncResult<Void> referenceAndDispatchReads(Object callback)

Review Comment:
   rather than `Void` should do `?`.  You can't do that due to a bug in 
`AsyncResults.reduce` (`List<AsyncChain<A>>` doesn't work, need `List<? extends 
AsyncChain<? extends A>>`), but once that is fixed `?` will work just fine.



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);

Review Comment:
   think you need to call `releaseResources` (well something like it), if you 
attempt to load 5 commands and 1 fails, then the whole operation fails and we 
don't release the 4 successful commands.
   
   Another solution would be to change 
`org.apache.cassandra.service.accord.async.AsyncLoader#referenceAndDispatchReads`
 to add a cleanup on-error handler.  You would need to do this at the top level 
so a failure to load CommandsForKey releases Command as well.  Since failed 
loads still up the rc I think you can just release the keys without issue...  
though may have to worry about what thread is doing the work to avoid race 
conditions
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to