aweisberg commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1014434777
##########
accord-core/src/main/java/accord/coordinate/Persist.java:
##########
@@ -21,73 +21,87 @@
import java.util.HashSet;
import java.util.Set;
-import accord.api.Key;
import accord.api.Result;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.local.Node.Id;
+import accord.local.Status;
import accord.messages.Apply;
-import accord.messages.Apply.ApplyOk;
+import accord.messages.Apply.ApplyReply;
import accord.messages.Callback;
import accord.messages.Commit;
-import accord.messages.InformOfPersistence;
+import accord.messages.Commit.Kind;
+import accord.messages.InformHomeDurable;
+import accord.primitives.Deps;
+import accord.primitives.Route;
+import accord.primitives.Txn;
import accord.topology.Shard;
import accord.topology.Topologies;
-import accord.primitives.Deps;
import accord.primitives.Timestamp;
-import accord.txn.Txn;
import accord.primitives.TxnId;
-import accord.txn.Writes;
+import accord.primitives.Writes;
+
+import static accord.local.Status.Durability.Durable;
-// TODO: do not extend AsyncFuture, just use a simple BiConsumer callback
-public class Persist implements Callback<ApplyOk>
+public class Persist implements Callback<ApplyReply>
{
final Node node;
final TxnId txnId;
- final Key homeKey;
+ final Route route;
+ final Txn txn;
final Timestamp executeAt;
+ final Deps deps;
final QuorumTracker tracker;
final Set<Id> persistedOn;
boolean isDone;
- public static void persist(Node node, Topologies topologies, TxnId txnId,
Key homeKey, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result
result)
+ public static void persist(Node node, Topologies sendTo, Topologies
applyTo, TxnId txnId, Route route, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result)
{
- Persist persist = new Persist(node, topologies, txnId, homeKey,
executeAt);
- node.send(topologies.nodes(), to -> new Apply(to, topologies, txnId,
txn, homeKey, executeAt, deps, writes, result), persist);
+ Persist persist = new Persist(node, applyTo, txnId, route, txn,
executeAt, deps);
+ node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch, txnId, route, executeAt, deps, writes, result), persist);
}
- public static void persistAndCommit(Node node, TxnId txnId, Key homeKey,
Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
+ public static void persistAndCommit(Node node, TxnId txnId, Route route,
Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
- Topologies persistTo = node.topology().preciseEpochs(txn,
executeAt.epoch);
- Persist persist = new Persist(node, persistTo, txnId, homeKey,
executeAt);
- node.send(persistTo.nodes(), to -> new Apply(to, persistTo, txnId,
txn, homeKey, executeAt, deps, writes, result), persist);
- if (txnId.epoch != executeAt.epoch)
- {
- Topologies earlierTopologies = node.topology().preciseEpochs(txn,
txnId.epoch, executeAt.epoch - 1);
- Commit.commit(node, earlierTopologies, persistTo, txnId, txn,
homeKey, executeAt, deps);
- }
+ Topologies sendTo = node.topology().preciseEpochs(route, txnId.epoch,
executeAt.epoch);
+ Topologies applyTo = node.topology().forEpoch(route, executeAt.epoch);
+ Persist persist = new Persist(node, sendTo, txnId, route, txn,
executeAt, deps);
+ node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch, txnId, route, executeAt, deps, writes, result), persist);
}
- private Persist(Node node, Topologies topologies, TxnId txnId, Key
homeKey, Timestamp executeAt)
+ private Persist(Node node, Topologies topologies, TxnId txnId, Route
route, Txn txn, Timestamp executeAt, Deps deps)
{
this.node = node;
this.txnId = txnId;
- this.homeKey = homeKey;
+ this.txn = txn;
+ this.deps = deps;
+ this.route = route;
this.tracker = new QuorumTracker(topologies);
this.executeAt = executeAt;
this.persistedOn = new HashSet<>();
}
@Override
- public void onSuccess(Id from, ApplyOk response)
+ public void onSuccess(Id from, ApplyReply reply)
{
- persistedOn.add(from);
- if (tracker.success(from) && !isDone)
+ switch (reply)
{
- // TODO: send to non-home replicas also, so they may clear their
log more easily?
- Shard homeShard = node.topology().forEpochIfKnown(homeKey,
txnId.epoch);
- node.send(homeShard, new InformOfPersistence(txnId, homeKey,
executeAt, persistedOn));
- isDone = true;
+ default: throw new IllegalStateException();
+ case Redundant:
+ case Applied:
+ persistedOn.add(from);
+ if (tracker.success(from) && !isDone)
+ {
+ // TODO: send to non-home replicas also, so they may clear
their log more easily?
+ Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey, txnId.epoch);
Review Comment:
I don't think you need to deal with it as part of this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]