belliottsmith commented on a change in pull request #1:
URL: https://github.com/apache/cassandra-accord/pull/1#discussion_r799290475



##########
File path: accord-core/src/main/java/accord/topology/TopologyManager.java
##########
@@ -0,0 +1,326 @@
+package accord.topology;
+
+import accord.api.ConfigurationService;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.Node;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.txn.Keys;
+import accord.txn.Txn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.function.LongConsumer;
+
+/**
+ * Manages topology state changes and update bookkeeping
+ *
+ * Each time the topology changes we need to:
+ * * confirm previous owners of ranges we replicate are aware of the new config
+ * * learn of any outstanding operations for ranges we replicate
+ * * clean up obsolete data
+ *
+ * Assumes a topology service that won't report epoch n without having n-1 etc 
also available
+ */
+public class TopologyManager implements ConfigurationService.Listener
+{
+    class EpochState
+    {
+        private final Topology global;
+        private final Topology local;
+        private final QuorumTracker syncTracker;
+        private boolean syncComplete = false;
+        private boolean prevSynced = false;
+
+        EpochState(Topology global, boolean prevSynced)
+        {
+            Preconditions.checkArgument(!global.isSubset());
+            this.global = global;
+            this.local = global.forNode(node);
+            this.syncTracker = new QuorumTracker(new 
Topologies.Singleton(global, false));
+            if (prevSynced)
+                markPrevSynced();
+        }
+
+        void markPrevSynced()
+        {
+            prevSynced = true;
+        }
+
+        public void recordSyncComplete(Node.Id node)
+        {
+            syncTracker.recordSuccess(node);
+            syncComplete = syncTracker.hasReachedQuorum();
+        }
+
+        long epoch()
+        {
+            return global.epoch;
+        }
+
+        boolean syncComplete()
+        {
+            return prevSynced && syncComplete;
+        }
+
+        /**
+         * determine if sync has completed for all shards intersecting with 
the given keys
+         */
+        boolean syncCompleteFor(Keys keys)
+        {
+            if (!prevSynced)
+                return false;
+            if (syncComplete)
+                return true;
+            Boolean result = global.accumulateForKeys(keys, (i, shard, acc) -> 
{
+                if (acc == Boolean.FALSE)
+                    return acc;
+                return 
Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
+            }, Boolean.TRUE);
+            return result == Boolean.TRUE;
+        }
+
+        boolean shardIsUnsynced(int idx, Shard shard)
+        {
+            return !prevSynced || 
!syncTracker.unsafeGet(idx).hasReachedQuorum();
+        }
+    }
+
+    private class Epochs
+    {
+        private final long maxEpoch;
+        private final long minEpoch;
+        private final EpochState[] epochs;
+        // nodes we've received sync complete notifications from, for epochs 
we do not yet have topologies for.
+        // Pending sync notifications are indexed by epoch, with the current 
epoch as index[0], and future epochs
+        // as index[epoch - currentEpoch]. Sync complete notifications for the 
current epoch are marked pending
+        // until the superseding epoch has been applied
+        private final List<Set<Node.Id>> pendingSyncComplete;
+
+        private Epochs(EpochState[] epochs, List<Set<Node.Id>> 
pendingSyncComplete)
+        {
+            this.maxEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
+            this.pendingSyncComplete = pendingSyncComplete;
+            for (int i=1; i<epochs.length; i++)
+                Preconditions.checkArgument(epochs[i].epoch() == 
epochs[i-1].epoch() - 1);
+            this.minEpoch = epochs.length > 0 ? epochs[epochs.length - 
1].epoch() : 0;
+            this.epochs = epochs;
+        }
+
+        private Epochs(EpochState[] epochs)
+        {
+            this(epochs, new ArrayList<>());
+        }
+
+        public long nextEpoch()
+        {
+            return current().epoch + 1;
+        }
+
+        public Topology current()
+        {
+            return epochs.length > 0 ? epochs[0].global : Topology.EMPTY;
+        }
+
+        public Epochs add(Topology topology)
+        {
+            Preconditions.checkArgument(topology.epoch == nextEpoch());
+            EpochState[] nextEpochs = new EpochState[epochs.length + 1];
+            List<Set<Node.Id>> pendingSync = new 
ArrayList<>(pendingSyncComplete);
+            if (!pendingSync.isEmpty())
+            {
+                EpochState current = epochs[0];
+                if (epochs.length <= 1 || epochs[1].syncComplete())
+                    current.markPrevSynced();
+                pendingSync.remove(0).forEach(current::recordSyncComplete);
+            }
+            System.arraycopy(epochs, 0, nextEpochs, 1, epochs.length);
+
+            boolean prevSynced = epochs.length == 0 || 
epochs[0].syncComplete();
+            nextEpochs[0] = new EpochState(topology, prevSynced);
+            return new Epochs(nextEpochs, pendingSync);
+        }
+
+        /**
+         * Mark sync complete for the given node/epoch, and if this epoch
+         * is now synced, update the prevSynced flag on superseding epochs
+         */
+        public void syncComplete(Node.Id node, long epoch)
+        {
+            Preconditions.checkArgument(epoch > 0);
+            if (epoch > maxEpoch - 1)
+            {
+                int idx = (int) (epoch - maxEpoch);
+                for (int i=pendingSyncComplete.size(); i<=idx; i++)
+                    pendingSyncComplete.add(new HashSet<>());
+
+                pendingSyncComplete.get(idx).add(node);
+            }
+            else
+            {
+                EpochState state = get(epoch);
+                state.recordSyncComplete(node);
+                for (epoch++ ;state.syncComplete() && epoch <= maxEpoch; 
epoch++)
+                {
+                    state = get(epoch);
+                    state.markPrevSynced();
+                }
+            }
+        }
+
+        private EpochState get(long epoch)
+        {
+            if (epoch > maxEpoch || epoch < maxEpoch - epochs.length)
+                return null;
+
+            return epochs[(int) (maxEpoch - epoch)];
+        }
+
+        long maxUnknownEpoch(TxnRequest.Scope scope)
+        {
+            EpochState lastState = null;
+            for (int i=0, mi=scope.size(); i<mi; i++)
+            {
+                TxnRequest.Scope.KeysForEpoch requestRanges = scope.get(i);
+                EpochState epochState = get(requestRanges.epoch);
+
+                if (epochState != null)
+                {
+                    lastState = epochState;
+                }
+                else if (lastState != null && 
lastState.local.ranges().intersects(requestRanges.keys))
+                {
+                    // we don't have the most recent epoch, but still 
replicate the requested ranges
+                    continue;
+                }
+                else
+                {
+                    // we don't have the most recent epoch, and we don't 
replicate the requested ranges
+                    return scope.maxEpoch();
+                }
+
+                // validate requested ranges
+                KeyRanges localRanges = epochState.local.ranges();
+                if (!localRanges.intersects(requestRanges.keys))
+                    throw new RuntimeException("Received request for ranges 
not replicated by this node");
+            }
+            if (scope.maxEpoch() > 0)
+                epochReporter.accept(scope.maxEpoch());
+
+            return 0;
+        }
+
+        boolean requiresHistoricalTopologiesFor(Keys keys)
+        {
+            return epochs.length > 1 && !epochs[1].syncCompleteFor(keys);
+        }
+    }
+
+    private final Node.Id node;
+    private final LongConsumer epochReporter;
+    private volatile Epochs epochs;
+
+    public TopologyManager(Node.Id node, LongConsumer epochReporter)
+    {
+        this.node = node;
+        this.epochReporter = epochReporter;
+        this.epochs = new Epochs(new EpochState[0]);
+    }
+
+    @Override
+    public synchronized void onTopologyUpdate(Topology topology)
+    {
+        epochs = epochs.add(topology);
+    }
+
+    @Override
+    public void onEpochSyncComplete(Node.Id node, long epoch)
+    {
+        epochs.syncComplete(node, epoch);
+    }
+
+    public Topology current()
+    {
+        return epochs.current();
+    }
+
+    public long epoch()
+    {
+        return current().epoch;
+    }
+
+    @VisibleForTesting
+    EpochState getEpochStateUnsafe(long epoch)
+    {
+        return epochs.get(epoch);
+    }
+
+    public Topologies forKeys(Keys keys, long minEpoch)
+    {
+        Epochs current = epochs;

Review comment:
       `snapshot`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to