aweisberg commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1014239302
##########
accord-core/src/main/java/accord/coordinate/FetchData.java:
##########
@@ -0,0 +1,78 @@
+package accord.coordinate;
+
+import java.util.function.BiConsumer;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.local.Status.ReplicationPhase;
+import accord.primitives.AbstractRoute;
+import accord.primitives.KeyRanges;
+import accord.primitives.PartialRoute;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import javax.annotation.Nullable;
+
+import static accord.local.Status.Committed;
+import static accord.local.Status.ReplicationPhase.*;
+
+/**
+ * Find data and persist locally
+ *
+ * TODO accept lower bound epoch to avoid fetching data we should already have
+ */
+public class FetchData
+{
+ public static void fetchOrInvalidateIfNotReplicated(ReplicationPhase
phase, Node node, TxnId txnId, RoutingKey homeKey, long untilLocalEpoch,
BiConsumer<ReplicationPhase, Throwable> callback)
+ {
+ }
+
+ public static Object fetchWithHomeKey(ReplicationPhase phase, Node node,
TxnId txnId, RoutingKey homeKey, long untilLocalEpoch,
BiConsumer<ReplicationPhase, Throwable> callback)
+ {
+ return FindRoute.findRoute(node, txnId, homeKey, (foundRoute, fail) ->
{
+ if (fail != null) callback.accept(null, fail);
+ else if (foundRoute == null) callback.accept(None, null);
+ else fetch(phase, node, txnId, foundRoute.route,
foundRoute.executeAt, untilLocalEpoch, callback);
+ });
+ }
+
+ public static Object fetch(ReplicationPhase phase, Node node, TxnId txnId,
AbstractRoute route, long untilLocalEpoch, BiConsumer<ReplicationPhase,
Throwable> callback)
+ {
+ return fetch(phase, node, txnId, route, null, untilLocalEpoch,
callback);
+ }
+
+ public static Object fetch(ReplicationPhase phase, Node node, TxnId txnId,
AbstractRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch,
BiConsumer<ReplicationPhase, Throwable> callback)
+ {
+ KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch,
untilLocalEpoch);
+ if (!route.covers(ranges))
+ {
+ return fetchWithHomeKey(phase, node, txnId, route.homeKey,
untilLocalEpoch, callback);
+ }
+ else
+ {
+ return fetchInternal(phase, node, txnId,
route.sliceStrict(ranges), executeAt, untilLocalEpoch, callback);
+ }
+ }
+
+ private static Object fetchInternal(ReplicationPhase target, Node node,
TxnId txnId, PartialRoute route, Timestamp executeAt, long untilLocalEpoch,
BiConsumer<ReplicationPhase, Throwable> callback)
+ {
+ KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch,
untilLocalEpoch);
+ PartialRoute fetch = route.sliceStrict(ranges);
+ long srcEpoch = target == Commit || executeAt == null ? txnId.epoch :
executeAt.epoch;
+ return CheckOn.checkOn(target, node, txnId, fetch, srcEpoch,
untilLocalEpoch, (ok, fail) -> {
+ if (fail != null) callback.accept(null, fail);
+ else if (ok == null) callback.accept(None, null);
+ else
+ {
+ // even if we have enough information to Apply for the
requested epochs, if we didn't request enough
+ // information to fulfil that phase locally we should
downgrade the response we give to the callback
Review Comment:
LOL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]