bdeggleston commented on code in PR #45:
URL: https://github.com/apache/cassandra-accord/pull/45#discussion_r1185374944
##########
accord-core/src/main/java/accord/coordinate/CheckOn.java:
##########
@@ -206,8 +208,9 @@ public Void apply(SafeCommandStore safeStore)
break;
case PreAccepted:
- if (!safeStore.ranges().at(txnId.epoch()).isEmpty())
- Commands.preaccept(safeStore, txnId, partialTxn,
maxRoute, progressKey);
+ // only preaccept if we coordinate the transaction
+ if (untilLocalEpoch <= txnId.epoch() ||
safeStore.ranges().coordinates(txnId).intersects(maxRoute))
Review Comment:
I'm not understanding the purpose of this change. I think this would only be
true (and the other side of the || not) would be if we learned of a txnId < our
current epoch
##########
accord-core/src/main/java/accord/coordinate/CoordinateNoOp.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.List;
+import java.util.Set;
+
+import accord.api.Result;
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.Commit;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.messages.Commit.Kind.Maximal;
+import static accord.primitives.Txn.Kind.NoOp;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached
agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them
complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateNoOp extends CoordinatePreAccept<Timestamp>
+{
+ private CoordinateNoOp(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+ {
+ super(node, txnId, txn, route);
+ }
+
+ public static AsyncResult<Timestamp> coordinate(Node node, Seekables<?, ?>
keysOrRanges)
+ {
+ TxnId txnId = node.nextTxnId(NoOp, keysOrRanges.domain());
+ return coordinate(node, txnId, keysOrRanges);
+ }
+
+ public static AsyncResult<Timestamp> coordinate(Node node, TxnId txnId,
Seekables<?, ?> keysOrRanges)
+ {
+ Invariants.checkArgument(txnId.rw() == NoOp);
+ FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+ CoordinateNoOp coordinate = new CoordinateNoOp(node, txnId,
node.agent().emptyTxn(NoOp, keysOrRanges), route);
+ coordinate.start();
+ return coordinate;
+ }
+
+ void onPreAccepted(Topologies topologies, Timestamp executeAt,
List<PreAcceptOk> successes)
+ {
+ if (executeAt.isRejected())
+ {
+ proposeAndCommitInvalidate(node, Ballot.ZERO, txnId,
route.homeKey(), route, executeAt, this);
+ }
+ else
+ {
+ new Propose<Timestamp>(node, topologies, Ballot.ZERO, txnId, txn,
route, executeAt, Deps.NONE, this)
+ {
+ @Override
+ void onAccepted()
+ {
+ Topologies applyTo = node.topology().forEpoch(route,
executeAt.epoch());
+ Topologies persistTo = txnId.epoch() == executeAt.epoch()
? applyTo : node.topology().preciseEpochs(route, txnId.epoch(),
executeAt.epoch());
+ Writes writes = txn.execute(txnId, txnId, null);
+ Result result = txn.result(txnId, executeAt, null);
+ // TODO (now): permit Apply to Commit in these cases
+ Set<Node.Id> nodes = persistTo.nodes();
+ node.send(nodes, id -> new Commit(Maximal, id,
persistTo.forEpoch(txnId.epoch()), persistTo, txnId, txn, route, Ranges.EMPTY,
executeAt, deps, false));
+ node.send(nodes, id -> new Apply(id, persistTo, applyTo,
executeAt.epoch(), txnId, route, txn, executeAt, deps, writes, result));
Review Comment:
wouldn't the Commit message be redundant for any node also receiving an
Apply message?
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -83,6 +123,65 @@ public Agent agent()
public abstract void shutdown();
+ // implementations are expected to override this for persistence
+ protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+ {
Review Comment:
could you make this abstract, since the implementor is expected to override?
##########
accord-core/src/main/java/accord/messages/MessageType.java:
##########
@@ -47,5 +47,7 @@
INFORM_DURABLE_REQ,
INFORM_HOME_DURABLE_REQ,
CHECK_STATUS_REQ,
- CHECK_STATUS_RSP
+ CHECK_STATUS_RSP,
+ GET_SYNC_POINT_REQ,
+ GET_SYNC_POINT_RSP,
Review Comment:
these are unused
##########
accord-core/src/main/java/accord/messages/PreAcceptSyncPoint.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.messages;
+
+import javax.annotation.Nullable;
+
+import accord.local.Node.Id;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+
+public class PreAcceptSyncPoint extends PreAccept
Review Comment:
this is unused (and is missing a message type)
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -83,6 +123,65 @@ public Agent agent()
public abstract void shutdown();
+ // implementations are expected to override this for persistence
+ protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+ {
Review Comment:
same for `setBootstrapBeganAt` and `setSafeToRead`
##########
accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java:
##########
@@ -30,17 +31,50 @@
import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
// TODO (desired, efficiency): if any shard *cannot* take the fast path, and
all shards have accepted, terminate
-public class FastPathTracker extends
AbstractTracker<FastPathTracker.FastPathShardTracker, Node.Id>
+public class FastPathTracker extends
AbstractTracker<FastPathTracker.FastPathQuorumShardTracker>
{
private static final ShardOutcome<FastPathTracker> NewFastPathSuccess =
(tracker, shardIndex) -> {
--tracker.waitingOnFastPathSuccess;
return --tracker.waitingOnShards == 0 ? Success : NoChange;
};
- public static class FastPathShardTracker extends ShardTracker
+ public static class FastPathQuorumShardTracker extends QuorumShardTracker
{
- protected int fastPathAccepts, accepts;
- protected int fastPathFailures, failures;
+ public FastPathQuorumShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ public ShardOutcome<? super FastPathTracker> onQuorumSuccess(Node.Id
node)
+ {
+ return super.onSuccess(node);
+ }
+
+ public ShardOutcome<? super FastPathTracker>
onMaybeFastPathSuccess(Node.Id node)
+ {
+ return super.onSuccess(node);
+ }
+
+ public ShardOutcome<? super FastPathTracker> onFailure(@Nonnull
Node.Id from)
+ {
+ return super.onFailure(from);
+ }
+
+ public boolean hasRejectedFastPath()
+ {
+ return false;
+ }
+
+ public boolean hasMetFastPathCriteria()
+ {
+ return true;
Review Comment:
don't we want to disable fast path commits if we need to talk with multiple
epochs, or has that been relaxed? I vaguely remember hearing that it was, but
am not sure
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -291,18 +333,49 @@ private void forEach(Routable keyOrRange, Ranges slice,
Consumer<RoutableKey> fo
}
}
+ @Override
+ protected void registerHistoricalTransactions(Deps deps)
+ {
+ Ranges allRanges = rangesForEpochHolder.get().all();
+ deps.keyDeps.keys().forEach(allRanges, key -> {
+ SafeCommandsForKey cfk = commandsForKey(key).createSafeReference();
+ deps.keyDeps.forEach(key, txnId -> {
+ // TODO (desired, efficiency): this can be made more efficient
by batching by epoch
+ if
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
+ return; // already coordinates, no need to replicate
+ if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
+ return;
+
+ cfk.registerNotWitnessed(txnId);
Review Comment:
it's not clear why we're registering txnIds for ranges we used to replicate.
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -113,23 +120,71 @@ RangesForEpoch ranges()
{
return ranges.current;
}
+
+ public String toString()
+ {
+ return store.id() + " " + ranges.current;
+ }
}
public static class RangesForEpoch
{
+ public enum Subset
Review Comment:
this is unused
##########
accord-core/src/main/java/accord/topology/Shard.java:
##########
@@ -151,4 +165,14 @@ public int hashCode()
{
return range.hashCode();
}
+
+ private static boolean isSortedUnique(List<Id> test)
Review Comment:
this is unused
##########
accord-core/src/main/java/accord/primitives/TxnId.java:
##########
@@ -120,7 +128,7 @@ private static Domain domain(int flags)
private static int rwOrdinal(int flags)
{
- return (flags >> 1) & 1;
+ return (flags >> 1) & 7;
Review Comment:
not specific to this patch, but could we rename `rw` -> `kind`?
##########
accord-core/src/main/java/accord/messages/PreAccept.java:
##########
@@ -239,6 +251,76 @@ private static <T extends Deps> T
calculateDeps(SafeCommandStore commandStore, T
return builder.build();
}
+ /**
+ * To simplify the implementation of bootstrap/range movements, we have
coordinators abort transactions
+ * that span too many topology changes for any given shard. This means
that we can always daisy-chain a replica
+ * that can inform a new/joining/bootstrapping replica of the data table
state and relevant transaction
+ * history without peeking into the future.
+ *
+ * This is necessary because when we create an ExclusiveSyncPoint there
may be some transactions that
+ * are captured by it as necessary to witness the result of, but that will
execute after it at some arbitrary
+ * future point. For simplicity, we wait for these transactions to execute
on the source replicas
+ * before streaming the table state to the target replicas. But if these
execute in a future topology,
+ * there may not be a replica that is able to wait for and execute the
transaction.
+ * So, we simply prohibit them from doing so.
+ *
+ * TODO (desired): it would be nice if this were enforced by some register
on replicas that inform coordinators
+ * of the maximum permitted executeAt. But this would make
ExclusiveSyncPoint more complicated to coordinate.
+ */
+ public static boolean rejectExecuteAt(TxnId txnId, Topologies topologies)
+ {
+ // for each superseding shard, mark any nodes removed in a long
bitmap; once the number of removals
+ // is greater than the minimum maxFailures for any shard, we reject
the executeAt.
+ // Note, this over-estimates the number of removals by counting
removals from _any_ superseding shard
+ // (rather than counting each superseding shard separately)
+ int originalIndex = topologies.indexForEpoch(txnId.epoch());
+ if (originalIndex == 0)
+ return false;
+
+ List<Shard> originalShards = topologies.get(originalIndex).shards();
+ if (originalShards.stream().anyMatch(s -> s.nodes.size() > 64))
+ return true;
+
+ long[] removals = new long[originalShards.size()];
+ int minMaxFailures = originalShards.stream().mapToInt(s ->
s.maxFailures).min().getAsInt();
+ for (int i = originalIndex - 1 ; i >= 0 ; --i)
+ {
+ List<Shard> newShards = topologies.get(i).shards();
+ minMaxFailures = Math.min(minMaxFailures,
newShards.stream().mapToInt(s -> s.maxFailures).min().getAsInt());
+ int n = 0, o = 0;
+ while (n < newShards.size() && o < originalShards.size())
+ {
+ Shard nv = newShards.get(n);
+ Shard ov = originalShards.get(o);
+ {
+ int c = nv.range.compareIntersecting(ov.range);
+ if (c < 0) { ++n; continue; }
+ else if (c > 0) { ++o; continue; }
+ }
+ int nvi = 0, ovi = 0;
+ while (nvi < nv.nodes.size() && ovi < ov.nodes.size())
+ {
+ int c = nv.nodes.get(nvi).compareTo(ov.nodes.get(ovi));
Review Comment:
this should be using `Shard.sortedNodes` for it's comparison
##########
accord-core/src/main/java/accord/coordinate/Persist.java:
##########
@@ -53,18 +53,22 @@ public class Persist implements Callback<ApplyReply>
final Set<Id> persistedOn;
boolean isDone;
- public static void persist(Node node, Topologies sendTo, Topologies
applyTo, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps
deps, Writes writes, Result result)
+ // persistTo should be a superset of applyTo, and includes those
replicas/ranges that no longer replicate at the execution epoch
+ // but did replicate for coordination and would like to be informed of the
transaction's status (i.e. apply a no-op apply)
+ public static void persist(Node node, Topologies persistTo, Topologies
appliesTo, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps
deps, Writes writes, Result result)
{
- Persist persist = new Persist(node, applyTo, txnId, route, txn,
executeAt, deps);
- node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch(), txnId, route, txn, executeAt, deps, writes, result),
persist);
+ Persist persist = new Persist(node, appliesTo, txnId, route, txn,
executeAt, deps);
+ node.send(persistTo.nodes(), to -> new Apply(to, persistTo, appliesTo,
executeAt.epoch(), txnId, route, txn, executeAt, deps, writes, result),
persist);
}
- public static void persistAndCommit(Node node, TxnId txnId, FullRoute<?>
route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
+ public static void persistAndCommitMaximal(Node node, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result)
{
- Topologies sendTo = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
+ Topologies coordinate = node.topology().forEpoch(route, txnId.epoch());
Topologies applyTo = node.topology().forEpoch(route,
executeAt.epoch());
- Persist persist = new Persist(node, sendTo, txnId, route, txn,
executeAt, deps);
- node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch(), txnId, route, txn, executeAt, deps, writes, result),
persist);
+ Topologies persistTo = txnId.epoch() == executeAt.epoch() ? applyTo :
node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
+ Persist persist = new Persist(node, persistTo, txnId, route, txn,
executeAt, deps);
+ node.send(persistTo.nodes(), to -> new Commit(Maximal, to,
coordinate.current(), persistTo, txnId, txn, route, null, executeAt, deps,
false), persist);
+ node.send(applyTo.nodes(), to -> new Apply(to, persistTo, applyTo,
executeAt.epoch(), txnId, route, txn, executeAt, deps, writes, result),
persist);
Review Comment:
same question here wrt redundant messages
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]