belliottsmith commented on code in PR #113:
URL: https://github.com/apache/cassandra-accord/pull/113#discussion_r1746075947
##########
accord-core/src/main/java/accord/messages/Propagate.java:
##########
@@ -306,38 +299,76 @@ public Void apply(SafeCommandStore safeStore)
Invariants.checkState(executeAtIfKnown != null);
if (toEpoch >= executeAtIfKnown.epoch())
{
- confirm(Commands.apply(safeStore, safeCommand, txnId,
route, progressKey, executeAtIfKnown, stableDeps, partialTxn, writes, result));
+ confirm(Commands.apply(safeStore, safeCommand, txnId,
route, executeAtIfKnown, stableDeps, partialTxn, writes, result));
break;
}
case Stable:
- confirm(Commands.commit(safeStore, safeCommand, Stable,
ballot, txnId, route, progressKey, partialTxn, executeAtIfKnown, stableDeps));
+ confirm(Commands.commit(safeStore, safeCommand, Stable,
ballot, txnId, route, partialTxn, executeAtIfKnown, stableDeps));
break;
case Committed:
// TODO (expected): we can propagate Committed as Stable if we
have any other Stable result AND a quorum of committedDeps
case PreCommitted:
confirm(Commands.precommit(safeStore, safeCommand, txnId,
executeAtIfKnown, route));
// TODO (desired): would it be clearer to yield a SaveStatus
so we can have PreCommittedWithDefinition
- if (!achieved.definition.isKnown())
+ if (!found.definition.isKnown())
break;
case PreAccepted:
// only preaccept if we coordinate the transaction
if (safeStore.ranges().coordinates(txnId).intersects(route) &&
Route.isFullRoute(route))
- Commands.preaccept(safeStore, safeCommand, txnId,
txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
+ Commands.preaccept(safeStore, safeCommand, txnId,
txnId.epoch(), partialTxn, Route.castToFullRoute(route));
break;
case NotDefined:
break;
}
+ updateFetchResult(found.propagates(), slicedRoute);
return updateDurability(safeStore, safeCommand);
}
+ private void updateFetchResult(Known achieved, Route<?> slicedRoute)
+ {
+ achieved = achieved.propagates();
+ Unseekables<?> achievedTarget = slicedRoute;
+ Unseekables<?> didNotAchieveTarget = null;
+ if (target != null && !target.isSatisfiedBy(achieved))
+ {
+ achievedTarget = slicedRoute.slice(0, 0);
+ didNotAchieveTarget = slicedRoute;
+ }
+
+ FetchResult current = fetchResult;
+ while (true)
+ {
+ FetchResult next = current == null ? new FetchResult(achieved,
achievedTarget, didNotAchieveTarget)
+ : new
FetchResult(achieved.reduce(current.achieved),
+
achievedTarget.with((Unseekables)current.achievedTarget),
+
Unseekables.merge(current.didNotAchieveTarget, (Unseekables)
didNotAchieveTarget));
+
+ if (fetchResultUpdater.compareAndSet(this, current, next))
+ return;
+ }
+ }
+
+ private FetchResult finaliseFetchResult()
Review Comment:
oh, weird. good spot
##########
accord-core/src/main/java/accord/messages/Propagate.java:
##########
@@ -56,158 +61,99 @@
import static accord.local.Status.NotDefined;
import static accord.local.Status.Phase.Cleanup;
import static accord.local.Status.PreApplied;
-import static accord.messages.CheckStatus.WithQuorum.HasQuorum;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.illegalState;
// TODO (required): detect propagate loops where we don't manage to update
anything but should
-public class Propagate implements EpochSupplier, LocalRequest<Status.Known>,
PreLoadContext, MapReduceConsume<SafeCommandStore, Void>
+public class Propagate implements EpochSupplier, PreLoadContext,
MapReduceConsume<SafeCommandStore, Void>
{
- public static class SerializerSupport
- {
- public static Propagate create(TxnId txnId, Route<?> route, SaveStatus
maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot ballot,
Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey,
Status.Known achieved, FoundKnownMap known, boolean isTruncated, PartialTxn
partialTxn, PartialDeps committedDeps, long toEpoch, Timestamp executeAt,
Writes writes, Result result)
- {
- return new Propagate(txnId, route, maxKnowledgeSaveStatus,
maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known,
isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result);
- }
- }
-
- public final TxnId txnId;
- public final Route<?> route;
- // TODO (expected): remove dependency on these two SaveStatus
- public final SaveStatus maxKnowledgeSaveStatus;
- public final SaveStatus maxSaveStatus;
- public final Ballot ballot;
- public final Status.Durability durability;
- @Nullable public final RoutingKey homeKey;
- @Nullable public final RoutingKey progressKey;
+ final Node node;
+ final TxnId txnId;
+ final Route<?> route;
+ final Unseekables<?> propagateTo;
+ final Known target;
+
+ // TODO (desired): remove dependency on these two SaveStatus
+ final SaveStatus maxKnowledgeSaveStatus;
+ final SaveStatus maxSaveStatus;
+ final Ballot ballot;
+ final Status.Durability durability;
+ @Nullable final RoutingKey homeKey;
// this is a WHOLE NODE measure, so if commit epoch has more ranges we do
not count as committed if we can only commit in coordination epoch
- public final Status.Known achieved;
- public final FoundKnownMap known;
- public final boolean isShardTruncated;
- @Nullable public final PartialTxn partialTxn;
- @Nullable public final PartialDeps stableDeps;
+ final FoundKnownMap known;
+ @Nullable final PartialTxn partialTxn;
+ @Nullable final PartialDeps stableDeps;
// TODO (expected): toEpoch may only apply to certain local command stores
that have "witnessed the future" - confirm it is fine to use globally or else
narrow its scope
- public final long toEpoch;
- @Nullable public final Timestamp committedExecuteAt;
- @Nullable public final Writes writes;
- @Nullable public final Result result;
- protected transient BiConsumer<? super Status.Known, Throwable> callback;
+ final long toEpoch;
+ @Nullable final Timestamp committedExecuteAt;
+ @Nullable final Writes writes;
+ @Nullable final Result result;
+ final BiConsumer<? super FetchResult, Throwable> callback;
Review Comment:
fetchResult is passed to the callback?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]