dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103375260


##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;

Review Comment:
   this is a bug, this blocks us from failing because our state is `FAILED`



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);
         }
         else
             run();
     }
 
+    private void finish(R result)
+    {
+        Preconditions.checkArgument(state == State.COMPLETING);

Review Comment:
   Please add the following to the end `, "Unexpected state %s", state`



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -51,71 +55,107 @@
 {

Review Comment:
   Created a test to show that a failed load corrupts the cache and puts us in 
bad state (ref leak, state=null).  Here is the patch to include the test
   
   ```
   diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
   index b0d30b04f3..2b8c74b378 100644
   --- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
   +++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
   @@ -83,11 +83,11 @@ public class AccordStateCache
            }
        }
    
   -    static class Node<K, V extends ImmutableState>
   +    public static class Node<K, V extends ImmutableState>
        {
            static final long EMPTY_SIZE = ObjectSizes.measure(new 
AccordStateCache.Node(null, null, null));
    
   -        final K key;
   +        public final K key;
            @Nonnull Object state;
            final ItemAccessor<K, V> accessor;
            private Node<?, ?> prev;
   @@ -102,7 +102,12 @@ public class AccordStateCache
                this.accessor = accessor;
            }
    
   -        boolean maybeFinishLoad()
   +        public int referenceCount()
   +        {
   +            return references;
   +        }
   +
   +        public boolean maybeFinishLoad()
            {
                if (!(state instanceof PendingLoad))
                    return false;
   @@ -116,7 +121,7 @@ public class AccordStateCache
                return true;
            }
    
   -        boolean isLoaded()
   +        public boolean isLoaded()
            {
                return !(state instanceof PendingLoad);
            }
   @@ -126,9 +131,12 @@ public class AccordStateCache
                return state instanceof PendingLoad ? ((PendingLoad<V>) 
state).result : null;
            }
    
   -        V value()
   +        public V value()
            {
   -            Invariants.checkState(isLoaded() && state != null);
   +            Preconditions.checkState(isLoaded(), "value() accessed when not 
loaded for key %s; state was %s", key, state);
   +            Preconditions.checkState(state != null, "State was null for key 
%s", key);
   +            //TODO improve Invariants to support lazy messages
   +//            Invariants.checkState(isLoaded() && state != null);
                return (V) state;
            }
    
   @@ -190,7 +198,7 @@ public class AccordStateCache
        }
    
        public final Map<Object, Node<?, ?>> active = new HashMap<>();
   -    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
   +    public final Map<Object, Node<?, ?>> cache = new HashMap<>();
        private final Set<Instance<?, ?>> instances = new HashSet<>();
    
        private final NamedMap<Object, Node<?, ?>> pendingLoads = new 
NamedMap<>("pendingLoads");
   @@ -374,6 +382,12 @@ public class AccordStateCache
                this.accessor = accessor;
            }
    
   +        @VisibleForTesting
   +        public AccordStateCache parent()
   +        {
   +            return AccordStateCache.this;
   +        }
   +
            @Override
            public boolean equals(Object o)
            {
   diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
   index 1211d8113b..32e7b107c2 100644
   --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
   +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
   @@ -41,6 +41,7 @@ import accord.local.SafeCommandStore;
    import accord.primitives.RoutableKey;
    import accord.primitives.Seekables;
    import accord.primitives.TxnId;
   +import accord.utils.Invariants;
    import accord.utils.async.AsyncChains;
    import org.apache.cassandra.service.accord.AccordCommandStore;
    import org.apache.cassandra.service.accord.AccordStateCache;
   @@ -136,7 +137,6 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
            if (throwable != null)
            {
                logger.error(String.format("Operation %s failed", this), 
throwable);
   -            state = State.FAILED;
                fail(throwable);
            }
            else
   @@ -152,7 +152,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
    
        private void fail(Throwable throwable)
        {
   -        Preconditions.checkArgument(state != State.FINISHED && state != 
State.FAILED);
   +        Preconditions.checkArgument(state != State.FINISHED && state != 
State.FAILED, "Unexpected state %s", state);
            callback.accept(null, throwable);
            state = State.FAILED;
        }
   diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
   index d13cd00aad..42da5ede73 100644
   --- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
   +++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
   @@ -21,8 +21,11 @@ package org.apache.cassandra.service.accord.async;
    import java.util.Collections;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.function.Consumer;
   +import java.util.stream.Collectors;
   +import java.util.stream.Stream;
    
    import com.google.common.collect.Iterables;
   +import com.google.common.collect.MoreCollectors;
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.BeforeClass;
   @@ -37,6 +40,7 @@ import accord.primitives.RoutableKey;
    import accord.primitives.Timestamp;
    import accord.primitives.Txn;
    import accord.primitives.TxnId;
   +import accord.utils.async.AsyncResults;
    import org.apache.cassandra.SchemaLoader;
    import org.apache.cassandra.cql3.QueryProcessor;
    import org.apache.cassandra.cql3.UntypedResultSet;
   @@ -51,6 +55,7 @@ import 
org.apache.cassandra.service.accord.AccordStateCache;
    import org.apache.cassandra.service.accord.AccordTestUtils;
    import org.apache.cassandra.service.accord.api.PartitionKey;
    import org.apache.cassandra.utils.FBUtilities;
   +import org.assertj.core.api.Assertions;
    
    import static accord.local.PreLoadContext.contextFor;
    import static accord.utils.async.AsyncChains.awaitUninterruptibly;
   @@ -151,6 +156,79 @@ public class AsyncOperationTest
    
        }
    
   +    @Test
   +    public void testFailingLoad()
   +    {
   +        AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
   +
   +        TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
   +
   +        createCommittedAndPersist(commandStore, txnId);
   +
   +        Consumer<SafeCommandStore> consumer = safeStore -> 
safeStore.beginUpdate(txnId).readyToExecute();
   +        PreLoadContext ctx = PreLoadContext.contextFor(singleton(txnId), 
Keys.EMPTY);
   +
   +        AsyncOperation<Void> operation = new 
AsyncOperation.ForConsumer(commandStore, ctx, consumer)
   +        {
   +            @Override
   +            AsyncLoader createAsyncLoader(AccordCommandStore commandStore, 
PreLoadContext preLoadContext)
   +            {
   +                return new AsyncLoader(commandStore, 
preLoadContext.txnIds(), (Iterable<RoutableKey>) preLoadContext.keys())
   +                {
   +                    @Override
   +                    AccordStateCache.LoadFunction<TxnId, Command> 
loadCommandFunction(Object callback)
   +                    {
   +                        return (i1, i2) -> AsyncResults.failure(new 
NullPointerException());
   +                    }
   +
   +                    @Override
   +                    AccordStateCache.LoadFunction<RoutableKey, 
CommandsForKey> loadCommandsPerKeyFunction(Object callback)
   +                    {
   +                        return (i1, i2) -> AsyncResults.failure(new 
NullPointerException());
   +                    }
   +                };
   +            }
   +        };
   +
   +        commandStore.executor().submit(operation);
   +
   +        Assertions.assertThatThrownBy(() -> awaitUninterruptibly(operation))
   +                  .hasRootCauseInstanceOf(NullPointerException.class);
   +
   +        // did we corrupt the cache?
   +        AccordStateCache.Instance<TxnId, Command> commands = 
commandStore.commandCache();
   +        AccordStateCache.Instance<RoutableKey, CommandsForKey> forKeys = 
commandStore.commandsForKeyCache();
   +
   +//        assertNoLeaks(commands);
   +//        assertNoLeaks(forKeys);
   +
   +        assertNoFailedLoads(commands);
   +        assertNoFailedLoads(forKeys);
   +    }
   +
   +    private static void assertNoFailedLoads(AccordStateCache.Instance<?, ?> 
inst)
   +    {
   +        AccordStateCache cache = inst.parent();
   +        for (AccordStateCache.Node<?, ?> node : 
Stream.concat(cache.active.values().stream(), 
cache.cache.values().stream()).collect(Collectors.toList()))
   +        {
   +            node.maybeFinishLoad();
   +            // logically this isn't correct... we are NOT loaded
   +            Assertions.assertThat(node.isLoaded()).isTrue();
   +            Assertions.assertThat(node.value()).isNotNull();
   +        }
   +    }
   +
   +    private static void assertNoLeaks(AccordStateCache.Instance<?, ?> inst)
   +    {
   +        AccordStateCache cache = inst.parent();
   +        for (AccordStateCache.Node<?, ?> node : 
Stream.concat(cache.active.values().stream(), 
cache.cache.values().stream()).collect(Collectors.toList()))
   +        {
   +            Assertions.assertThat(node.referenceCount())
   +                      .describedAs("Leak detected for key %s %s", 
node.key.getClass().getSimpleName(), node.key)
   +                      .isEqualTo(0);
   +        }
   +    }
   +
        /**
         * save and load futures should be cleaned up as part of the operation
         */
   
   ```



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);
         }
         else
             run();
     }
 
+    private void finish(R result)
+    {
+        Preconditions.checkArgument(state == State.COMPLETING);
+        callback.accept(result, null);
+        state = State.FINISHED;
+    }
+
+    private void fail(Throwable throwable)
+    {
+        Preconditions.checkArgument(state != State.FINISHED && state != 
State.FAILED);

Review Comment:
   Please add the following to the end `, "Unexpected state %s", state`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to