belliottsmith commented on code in PR #254:
URL: https://github.com/apache/cassandra-accord/pull/254#discussion_r2355930076
##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -71,75 +73,81 @@ public class CoordinateSyncPoint<R> extends
CoordinatePreAccept<R>
final CoordinationAdapter<R> adapter;
- private CoordinateSyncPoint(Node node, SequentialAsyncExecutor executor,
TxnId txnId, Topologies topologies, Txn txn, FullRoute<?> route,
SyncPointAdapter<R> adapter, BiConsumer<R, Throwable> callback)
+ private CoordinateSyncPoint(Node node, SequentialAsyncExecutor executor,
TxnId txnId, Topologies topologies, Txn txn, FullRoute<?> route,
SyncPointAdapter<R> adapter, BiConsumer<? super R, Throwable> callback)
{
super(node, executor, txnId, txn, route, topologies,
adapter.preacceptTrackerFactory, callback);
this.adapter = adapter;
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
exclusive(Node node, Unseekables<U> keysOrRanges)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
exclusive(Node node, Unseekables<U> keysOrRanges)
{
return coordinate(node, ExclusiveSyncPoint, keysOrRanges,
Adapters.exclusiveSyncPoint());
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
exclusive(Node node, TxnId txnId, Unseekables<U> keysOrRanges)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
exclusive(Node node, TxnId txnId, Unseekables<U> keysOrRanges)
{
return coordinate(node, txnId, keysOrRanges,
Adapters.exclusiveSyncPoint());
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
exclusive(Node node, TxnId txnId, FullRoute<U> route)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
exclusive(Node node, TxnId txnId, FullRoute<U> route)
{
return coordinate(node, txnId, route, Adapters.exclusiveSyncPoint());
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
executeAtQuorum(Node node, Unseekables<U> keysOrRanges)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
executeAtQuorum(Node node, Unseekables<U> keysOrRanges)
{
return coordinate(node, ExclusiveSyncPoint, keysOrRanges,
Adapters.exclusiveSyncPoint());
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
executeAtQuorum(Node node, TxnId txnId, FullRoute<U> route)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
executeAtQuorum(Node node, TxnId txnId, FullRoute<U> route)
{
return coordinate(node, txnId, route, Adapters.exclusiveSyncPoint());
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges,
SyncPointAdapter<SyncPoint<U>> adapter)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges,
SyncPointAdapter<SyncPoint<U>> adapter)
{
Invariants.requireArgument(kind.isSyncPoint());
TxnId txnId = node.nextTxnIdWithDefaultFlags(kind,
keysOrRanges.domain(), cardinality(keysOrRanges));
- return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node,
txnId, keysOrRanges, adapter)).beginAsResult();
+ return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node,
txnId, keysOrRanges, adapter));
}
- public static <U extends Unseekable> AsyncResult<SyncPoint<U>>
coordinate(Node node, Txn.Kind kind, FullRoute<U> route,
SyncPointAdapter<SyncPoint<U>> adapter)
+ public static <U extends Unseekable> AsyncChain<SyncPoint<U>>
coordinate(Node node, Txn.Kind kind, FullRoute<U> route,
SyncPointAdapter<SyncPoint<U>> adapter)
{
Invariants.requireArgument(kind.isSyncPoint());
TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, route.domain(),
cardinality(route));
- return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node,
txnId, route, adapter)).beginAsResult();
+ return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node,
txnId, route, adapter));
}
- private static <U extends Unseekable> AsyncResult<SyncPoint<U>>
coordinate(Node node, TxnId txnId, Unseekables<U> keysOrRanges,
SyncPointAdapter<SyncPoint<U>> adapter)
+ private static <U extends Unseekable> AsyncChain<SyncPoint<U>>
coordinate(Node node, TxnId txnId, Unseekables<U> keysOrRanges,
SyncPointAdapter<SyncPoint<U>> adapter)
{
Invariants.requireArgument(txnId.isSyncPoint());
FullRoute<U> route = (FullRoute<U>) node.computeRoute(txnId,
keysOrRanges);
return coordinate(node, txnId, route, adapter);
}
- private static <U extends Unseekable> AsyncResult<SyncPoint<U>>
coordinate(Node node, TxnId txnId, FullRoute<U> route,
SyncPointAdapter<SyncPoint<U>> adapter)
+ private static <U extends Unseekable> AsyncChain<SyncPoint<U>>
coordinate(Node node, TxnId txnId, FullRoute<U> route,
SyncPointAdapter<SyncPoint<U>> adapter)
{
- Invariants.requireArgument(txnId.isSyncPoint());
- TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), route);
- if (mismatch != null)
- return AsyncResults.failure(mismatch);
-
try
{
- SettableByCallback<SyncPoint<U>> result = new
SettableByCallback<>();
- CoordinateSyncPoint<SyncPoint<U>> coordinate = new
CoordinateSyncPoint<>(node, node.someSequentialExecutor(), txnId,
adapter.forDecision(node, route, SHARE, txnId, txnId),
node.agent().emptySystemTxn(txnId.kind(), txnId.domain()), route, adapter,
result);
- coordinate.start();
- return result;
+ Invariants.requireArgument(txnId.isSyncPoint());
+ TopologyMismatch mismatch =
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
txnId, route.homeKey(), route);
+ if (mismatch != null)
+ throw mismatch;
+
+ return new AsyncChains.Head<>()
Review Comment:
I don't quite follow. The AsyncChain.Head defines the latent work we haven't
yet submitted, and that can be notified of the result if you call begin with a
callback. Maybe this comment is attributed to the wrong spot in the code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]