belliottsmith commented on a change in pull request #1:
URL: https://github.com/apache/cassandra-accord/pull/1#discussion_r799260224
##########
File path: accord-core/src/main/java/accord/topology/TopologyManager.java
##########
@@ -0,0 +1,325 @@
+package accord.topology;
+
+import accord.api.ConfigurationService;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.Node;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.messages.TxnRequestScope;
+import accord.txn.Keys;
+import accord.txn.Txn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.function.LongConsumer;
+
+/**
+ * Manages topology state changes and update bookkeeping
+ *
+ * Each time the topology changes we need to:
+ * * confirm previous owners of ranges we replicate are aware of the new config
+ * * learn of any outstanding operations for ranges we replicate
+ * * clean up obsolete data
+ *
+ * Assumes a topology service that won't report epoch n without having n-1 etc
also available
+ */
+public class TopologyManager implements ConfigurationService.Listener
+{
+ class EpochState
+ {
+ private final Topology global;
+ private final Topology local;
+ private final QuorumTracker syncTracker;
+ private boolean syncComplete = false;
+ private boolean prevSynced;
+
+ private boolean updateState(boolean prevSynced)
+ {
+ this.prevSynced = prevSynced;
+ if (!syncComplete)
+ {
+ syncComplete = syncTracker.hasReachedQuorum();
+ return syncComplete;
+ }
+ return false;
+ }
+
+ EpochState(Topology global, boolean prevSynced)
+ {
+ Preconditions.checkArgument(!global.isSubset());
+ this.global = global;
+ this.local = global.forNode(node);
+ this.syncTracker = new QuorumTracker(new
Topologies.Singleton(global, false));
+ this.prevSynced = prevSynced;
+ updateState(prevSynced);
+ }
+
+ public boolean recordSyncComplete(Node.Id node, boolean prevSynced)
+ {
+ syncTracker.recordSuccess(node);
+ return updateState(prevSynced);
+ }
+
+ long epoch()
+ {
+ return global.epoch;
+ }
+
+ boolean syncComplete()
+ {
+ return prevSynced && syncComplete;
+ }
+
+ /**
+ * determine if sync has completed for all shards intersecting with
the given keys
+ */
+ boolean syncCompleteFor(Keys keys)
+ {
+ if (!prevSynced)
+ return false;
+ if (syncComplete)
+ return true;
+ Boolean result = global.accumulateForKeys(keys, (i, shard, acc) ->
{
+ if (acc == Boolean.FALSE)
+ return acc;
+ return
Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
+ }, Boolean.TRUE);
+ return result == Boolean.TRUE;
+ }
+
+ boolean shardIsUnsynced(int idx, Shard shard)
+ {
+ return !syncTracker.unsafeGet(idx).hasReachedQuorum();
+ }
+ }
+
+ private class Epochs
+ {
+ private final long maxEpoch;
+ private final long minEpoch;
+ private final EpochState[] epochs;
+ private final List<Set<Node.Id>> pendingSyncComplete;
+
+ private Epochs(EpochState[] epochs, List<Set<Node.Id>>
pendingSyncComplete)
+ {
+ this.maxEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
+ this.pendingSyncComplete = pendingSyncComplete;
+ for (int i=1; i<epochs.length; i++)
+ Preconditions.checkArgument(epochs[i].epoch() ==
epochs[i-1].epoch() - 1);
+ this.minEpoch = epochs.length > 0 ? epochs[epochs.length -
1].epoch() : 0;
+ this.epochs = epochs;
+ }
+
+ private Epochs(EpochState[] epochs)
+ {
+ this(epochs, new ArrayList<>());
+ }
+
+ public long nextEpoch()
+ {
+ return current().epoch + 1;
+ }
+
+ public Topology current()
+ {
+ return epochs.length > 0 ? epochs[0].global : Topology.EMPTY;
+ }
+
+ public Epochs add(Topology topology)
+ {
+ Preconditions.checkArgument(topology.epoch == nextEpoch());
+ EpochState[] nextEpochs = new EpochState[epochs.length + 1];
+ List<Set<Node.Id>> pendingSync = pendingSyncComplete;
+ if (!pendingSync.isEmpty())
+ {
+ boolean prevSynced = epochs.length <= 1 ||
epochs[1].syncComplete();
+ EpochState currentEpoch = epochs[0];
+ pendingSync.remove(0).forEach(id ->
currentEpoch.recordSyncComplete(id, prevSynced));
Review comment:
Thanks, I'm just confused why that translates into removing index 0
rather than pushing a new collection at index 0 and shifting the remainder
down? If we're adding an epoch aren't we invalidating the whole sequence. I
will be reading this all again more carefully today, hopefully with my brain
working better, so hopefully I'll grok it properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]