belliottsmith commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1122914230


##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +61,91 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the 
cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. 
({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
-                                                                               
            AccordStateCache.Instance<K, V> cache,
-                                                                               
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
-                                                                               
            Object callback)
+    private <K, V, S extends AccordSafeState<K, V>> void 
referenceAndAssembleReads(Iterable<K> keys,
+                                                                               
    Map<K, S> context,
+                                                                               
    AccordStateCache.Instance<K, V, S> cache,
+                                                                               
    Function<K, V> loadFunction,
+                                                                               
    List<Runnable> loadRunnables,
+                                                                               
    List<AsyncChain<?>> listenChains)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
-                continue;
-
-            if (futures == null)
-                futures = new ArrayList<>();
-
-            futures.add(future);
+            S safeRef = cache.reference(key);
+            context.put(key, safeRef);
+            AccordLoadingState.LoadingState state = safeRef.loadingState();
+            switch (state)
+            {
+                case NOT_FOUND:
+                    AsyncResults.Unscheduled<V> load = 
safeRef.load(loadFunction);
+                    listenChains.add(load);
+                    loadRunnables.add(load);
+                    break;
+                case PENDING:
+                    listenChains.add(safeRef.listen());
+                    break;
+                case LOADED:
+                    break;
+                case FAILED:
+                    throw new RuntimeException(safeRef.failure());
+                default:
+                    throw new IllegalStateException("Unhandled loading state: 
" + state);
+            }
         }
-
-        return futures;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    Function<TxnId, Command> loadCommandFunction()
     {
-        return command -> Stage.READ.submit(() -> {
-            try
-            {
-                logger.trace("Starting load of {} for {}", command.txnId(), 
callback);
-                AccordKeyspace.loadCommand(commandStore, command);
-                logger.trace("Completed load of {} for {}", command.txnId(), 
callback);
-            }
-            catch (Throwable t)
-            {
-                logger.error("Exception loading {} for {}", command.txnId(), 
callback, t);
-                throw t;
-            }
-        });
+        return txnId -> AccordKeyspace.loadCommand(commandStore, txnId);
     }
 
     @VisibleForTesting
-    Function<AccordCommandsForKey, Future<?>> 
loadCommandsPerKeyFunction(Object callback)
+    Function<RoutableKey, CommandsForKey> loadCommandsPerKeyFunction()
     {
-        return cfk -> Stage.READ.submit(() -> {
-            try
-            {
-                logger.trace("Starting load of {} for {}", cfk.key(), 
callback);
-                AccordKeyspace.loadCommandsForKey(cfk);
-                logger.trace("Completed load of {} for {}", cfk.key(), 
callback);
-            }
-            catch (Throwable t)
-            {
-                logger.error("Exception loading {} for {}", cfk.key(), 
callback, t);
-                throw t;
-            }
-        });
+        return key -> AccordKeyspace.loadCommandsForKey(commandStore, 
(PartitionKey) key);
     }
 
-    private Future<?> referenceAndDispatchReads(AsyncContext context, Object 
callback)
+    private AsyncResult<?> referenceAndDispatchReads(AsyncOperation.Context 
context)
     {
-        List<Future<?>> futures = null;
-
-        futures = referenceAndDispatchReads(txnIds,
-                                            commandStore.commandCache(),
-                                            context.commands.items,
-                                            loadCommandFunction(callback),
-                                            futures,
-                                            callback);
-
-        futures = referenceAndDispatchReads(keys,
-                                            commandStore.commandsForKeyCache(),
-                                            context.commandsForKey.items,
-                                            
loadCommandsPerKeyFunction(callback),
-                                            futures,
-                                            callback);
-
-        return futures != null ? FutureCombiner.allOf(futures) : null;
+        List<Runnable> runnables = new ArrayList<>();
+        List<AsyncChain<?>> chains = new ArrayList<>();
+
+        referenceAndAssembleReads(txnIds,
+                                  context.commands,
+                                  commandStore.commandCache(),
+                                  loadCommandFunction(),
+                                  runnables,
+                                  chains);
+
+        referenceAndAssembleReads(keys,
+                                  context.commandsForKeys,
+                                  commandStore.commandsForKeyCache(),
+                                  loadCommandsPerKeyFunction(),
+                                  runnables,
+                                  chains);
+
+        if (chains.isEmpty())
+        {
+            Invariants.checkState(runnables.isEmpty());
+            return null;
+        }
+
+        if (!runnables.isEmpty())
+            AsyncChains.ofRunnables(Stage.READ.executor(), 
runnables).begin(commandStore.agent());

Review Comment:
   Could we comment why these don't need to gate the result, and perhaps name 
the collection accordingly?



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,42 +147,94 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
-            state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);
         }
         else
             run();
     }
 
+    private void finish(R result)
+    {
+        Invariants.checkArgument(state == State.COMPLETING, "Unexpected state 
%s", state);
+        try

Review Comment:
   We duplicate this in `fail` - perhaps we should modify this to `finish(R 
success, Throwable fail)` and invoke it from `fail`?



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +61,91 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);

Review Comment:
   Why do we copy these to a new `List`? If we need a `List` (or a `Set` 
somewhere else) we should try to specify this in `PreLoadContext` imo.



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -247,96 +259,115 @@ private void maybeEvict()
             Node<?, ?> evict = current;
             current = current.prev;
 
-            // if there are any dangling write only groups, apply them and
-            // move their futures into write futures so we don't evict
-            applyAndRemoveWriteOnlyGroup(evict.value);
-            if (!canEvict(evict.key()))
+            if (!canEvict(evict))
                 continue;
 
-            logger.trace("Evicting {} {}", 
evict.value.getClass().getSimpleName(), evict.key());
-            unlink(evict);
-            cache.remove(evict.key());
-            bytesCached -= evict.estimatedSizeOnHeap();
+            evict(evict, true);
         }
     }
 
-    private static <K, F extends Future<?>> F getFuture(NamedMap<Object, F> 
futuresMap, K key)
+    private void evict(Node<?, ?> evict, boolean unlink)
     {
-        F r = futuresMap.get(key);
+        logger.info("Evicting {} {}", evict.state(), evict.key());
+        if (unlink) unlink(evict);
+        Node<?, ?> self = cache.get(evict.key());
+        Invariants.checkState(self == evict, "Leaked node detected; was 
attempting to remove %s but cache had %s", evict, self);
+        cache.remove(evict.key());
+        bytesCached -= evict.lastQueriedEstimatedSizeOnHeap;
+    }
+
+    private static <K, V, F extends AsyncResult<V>> F 
getAsyncResult(NamedMap<Object, F> resultMap, K key)
+    {
+        F r = resultMap.get(key);
         if (r == null)
             return null;
 
-        if (!r.isDone())
+        // if the result was a failure, can not remove from the map as this 
would allow eviction
+        if (!r.isSuccess())
             return r;
 
         if (logger.isTraceEnabled())
-            logger.trace("Clearing future for {} from {}: {}", key, 
futuresMap.name, r);
-        futuresMap.remove(key);
+            logger.trace("Clearing result for {} from {}: {}", key, 
resultMap.name, r);
+        resultMap.remove(key);
         return null;
     }
 
-    private static <K, F extends Future<?>> void setFuture(Map<Object, F> 
futuresMap, K key, F future)
+    private static <K, F extends AsyncResult<?>> void 
setAsyncResult(Map<Object, F> resultsMap, K key, F result)
+    {
+        Preconditions.checkState(!resultsMap.containsKey(key));
+        resultsMap.put(key, result);
+    }
+
+    private static <K, V> boolean hasActiveAsyncResult(NamedMap<Object, 
AsyncResult<V>> resultMap, K key)
     {
-        Preconditions.checkState(!futuresMap.containsKey(key));
-        futuresMap.put(key, future);
+        // getResult only returns a result if it is not complete, so don't 
need to check if its been completed
+        return getAsyncResult(resultMap, key) != null;
     }
 
-    private static <K> void mergeFuture(Map<Object, Future<?>> futuresMap, K 
key, Future<?> future)
+    private static <K> void mergeAsyncResult(Map<Object, AsyncResult<Void>> 
resultMap, K key, AsyncResult<Void> result)
     {
-        Future<?> existing = futuresMap.get(key);
+        AsyncResult<Void> existing = resultMap.get(key);
         if (existing != null && !existing.isDone())
         {
-            logger.trace("Merging future {} with existing {}", future, 
existing);
-            future = FutureCombiner.allOf(ImmutableList.of(existing, future));
+            logger.trace("Merging result {} with existing {}", result, 
existing);
+            result = AsyncChains.reduce(ImmutableList.of(existing, result), 
(a, b) -> null).beginAsResult();

Review Comment:
   Introduce a new method for `AsyncChains.reduce` to avoid wrapping in a list?



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +61,91 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);

Review Comment:
   Should we just accept an `AbstractKeys<RoutingKey, ?>` instead?



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommand.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.local.Command;
+import accord.local.SafeCommand;
+import accord.primitives.TxnId;
+
+public class AccordSafeCommand extends SafeCommand implements 
AccordSafeState<TxnId, Command>
+{
+    private boolean invalidated;
+    private final AccordLoadingState<TxnId, Command> global;
+    private Command original;
+    private Command current;
+
+    public AccordSafeCommand(AccordLoadingState<TxnId, Command> global)
+    {
+        super(global.key());
+        this.global = global;
+        this.original = null;
+        this.current = null;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AccordSafeCommand that = (AccordSafeCommand) o;
+        return Objects.equals(original, that.original) && 
Objects.equals(current, that.current);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();

Review Comment:
   This is a bug tho, we shouldn’t be using a custom hashCode there? It’s 
misleading to implement hashCode only to use in toString - we should implement 
toString and not hashCode. It’s misleading to have hashCode implemented when we 
don’t use it as a hashCode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to