aweisberg commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1014302471


##########
accord-core/src/main/java/accord/messages/ReadData.java:
##########
@@ -18,236 +18,242 @@
 
 package accord.messages;
 
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-import accord.primitives.Deps;
-import com.google.common.base.Preconditions;
-
 import accord.api.Key;
+import accord.local.Status.ReplicationPhase;
+import accord.primitives.*;
+
 import accord.local.*;
-import accord.local.Node.Id;
 import accord.api.Data;
 import accord.topology.Topologies;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.txn.Txn;
-import accord.primitives.TxnId;
-import accord.utils.DeterministicIdentitySet;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReadData extends TxnRequest
+import javax.annotation.Nullable;
+
+import static accord.local.Status.Committed;
+import static accord.messages.MessageType.READ_RSP;
+import static accord.messages.ReadData.ReadNack.NotCommitted;
+import static accord.messages.ReadData.ReadNack.Redundant;
+import static accord.messages.TxnRequest.*;
+
+// TODO (soon): dedup - can currently have infinite pending reads that will be 
executed independently
+public class ReadData extends AbstractEpochRequest<ReadData.ReadNack> 
implements Listener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ReadData.class);
 
-    static class LocalRead implements Listener, PreLoadContext
+    public static class SerializerSupport
     {
-        final TxnId txnId;
-        final Deps deps;
-        final Node node;
-        final Node.Id replyToNode;
-        final Keys scope;
-        final Keys txnKeys;
-        final ReplyContext replyContext;
-
-        Data data;
-        boolean isObsolete; // TODO: respond with the Executed result we have 
stored?
-        Set<CommandStore> waitingOn;
-
-        LocalRead(TxnId txnId, Deps deps, Node node, Id replyToNode, Keys 
scope, Keys txnKeys, ReplyContext replyContext)
+        public static ReadData create(TxnId txnId, Keys scope, long 
executeAtEpoch, long waitForEpoch)
         {
-            Preconditions.checkArgument(!scope.isEmpty());
-            this.txnId = txnId;
-            this.deps = deps;
-            this.node = node;
-            this.replyToNode = replyToNode;
-            this.scope = scope;
-            this.txnKeys = txnKeys;  // TODO (now): is this needed? Does the 
read update commands per key?
-            this.replyContext = replyContext;
+            return new ReadData(txnId, scope, executeAtEpoch, waitForEpoch);
         }
+    }
 
-        @Override
-        public Iterable<TxnId> txnIds()
-        {
-            return Iterables.concat(Collections.singleton(txnId), 
deps.txnIds());
-        }
+    public final long executeAtEpoch;
+    public final Keys readScope; // TODO: this should be RoutingKeys as we 
have the Keys locally - but for simplicity for now we use Keys to implement 
keys()
+    private final long waitForEpoch;
+    private transient Data data;
+    private transient boolean isObsolete; // TODO: respond with the Executed 
result we have stored?
+    private transient BitSet waitingOn;
+    private transient int waitingOnCount;
 
-        @Override
-        public Iterable<Key> keys()
-        {
-            return txnKeys;
-        }
+    public ReadData(Node.Id to, Topologies topologies, TxnId txnId, Keys 
readScope, Timestamp executeAt)
+    {
+        super(txnId);
+        this.executeAtEpoch = executeAt.epoch;
+        int startIndex = latestRelevantEpochIndex(to, topologies, readScope);
+        this.readScope = computeScope(to, topologies, readScope, startIndex, 
Keys::slice, Keys::union);
+        this.waitForEpoch = computeWaitForEpoch(to, topologies, startIndex);
+    }
 
-        @Override
-        public PreLoadContext listenerPreLoadContext(TxnId caller)
-        {
-            Set<TxnId> ids = new HashSet<>(deps.txnIds());
-            ids.add(txnId);
-            ids.add(caller);
-            return PreLoadContext.contextFor(ids, keys());
-        }
+    ReadData(TxnId txnId, Keys readScope, long executeAtEpoch, long 
waitForEpoch)
+    {
+        super(txnId);
+        this.executeAtEpoch = executeAtEpoch;
+        this.readScope = readScope;
+        this.waitForEpoch = waitForEpoch;
+    }
 
-        @Override
-        public String toString()
-        {
-            return "ReadData$LocalRead{" + txnId + '}';
-        }
+    @Override
+    public long waitForEpoch()
+    {
+        return waitForEpoch;
+    }
 
-        @Override
-        public synchronized void onChange(Command command)
-        {
-            logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
-                         this, command.txnId(), command.status(), command);
-            switch (command.status())
-            {
-                default: throw new IllegalStateException();
-                case NotWitnessed:
-                case PreAccepted:
-                case Accepted:
-                case AcceptedInvalidate:
-                case Committed:
-                    return;
-
-                case Executed:
-                case Applied:
-                case Invalidated:
-                    obsolete();
-                case ReadyToExecute:
-            }
+    @Override
+    protected void process()
+    {
+        waitingOn = new BitSet();
+        node.mapReduceConsumeLocal(this, readScope, executeAtEpoch, 
executeAtEpoch, this);
+    }
 
-            command.removeListener(this);
-            if (!isObsolete)
-                read(command);
-        }
+    @Override
+    public PreLoadContext listenerPreLoadContext(TxnId caller)
+    {
+        Set<TxnId> ids = new HashSet<>();
+        ids.add(txnId);
+        ids.add(caller);
+        return PreLoadContext.contextFor(ids, keys());
+    }
 
-        @Override
-        public boolean isTransient()
+    @Override
+    public boolean isTransient()
+    {
+        return true;
+    }
+
+    @Override
+    public synchronized void onChange(SafeCommandStore safeStore, Command 
command)
+    {
+        logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
+                this, command.txnId(), command.status(), command);
+        switch (command.status())
         {
-            return true;
+            default: throw new AssertionError();
+            case NotWitnessed:
+            case PreAccepted:
+            case Accepted:
+            case AcceptedInvalidate:
+            case Committed:
+                return;
+
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                obsolete();
+            case ReadyToExecute:
         }
 
-        private synchronized void readComplete(CommandStore commandStore, Data 
result)
-        {
-            logger.trace("{}: read completed on {}", txnId, commandStore);
-            data = data == null ? result : data.merge(result);
+        command.removeListener(this);
+        if (!isObsolete)
+            read(safeStore, command);
+    }
 
-            waitingOn.remove(commandStore);
-            if (waitingOn.isEmpty())
-                node.reply(replyToNode, replyContext, new ReadOk(data));
+    @Override
+    public synchronized ReadNack apply(SafeCommandStore safeStore)
+    {
+        Command command = safeStore.command(txnId);
+        Status status = command.status();
+        logger.trace("{}: setting up read with status {} on {}", txnId, 
status, safeStore);
+        switch (command.status()) {
+            default:
+                throw new AssertionError();
+            case Committed:
+                if (!command.partialTxn().keys().any((i, k) -> 
safeStore.commandStore().hashIntersects(k)))
+                    throw new IllegalStateException();
+            case NotWitnessed:
+            case PreAccepted:
+            case Accepted:
+            case AcceptedInvalidate:
+                waitingOn.set(safeStore.commandStore().id());
+                ++waitingOnCount;
+                command.addListener(this);
+
+                if (status == Committed)
+                    return null;
+
+                safeStore.progressLog().waiting(txnId, 
ReplicationPhase.Commit, readScope.toRoutingKeys());

Review Comment:
   I just missed the obvious reason it moved which is that if it's committed we 
aren't waiting for it to commit and the progress log doesn't need to ensure 
progress (some other progress log on the home shard will do that, guaranteed to 
know about it because it is committed).
   
   Which could also be the comment. If you have fairly deep knowledge of the 
whole thing yes it is "obvious" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to