belliottsmith commented on code in PR #113: URL: https://github.com/apache/cassandra-accord/pull/113#discussion_r1746034649
########## accord-core/src/main/java/accord/impl/progresslog/WaitingState.java: ########## @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.progresslog; + +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import accord.api.ProgressLog.BlockedUntil; +import accord.coordinate.AsynchronousAwait; +import accord.coordinate.FetchData; +import accord.local.Command; +import accord.local.Node; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.local.SaveStatus; +import accord.local.Status; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.Routables; +import accord.primitives.Route; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.topology.Topologies; +import accord.utils.Invariants; + +import static accord.api.ProgressLog.BlockedUntil.CanApply; +import static accord.api.ProgressLog.BlockedUntil.Query.HOME; +import static accord.api.ProgressLog.BlockedUntil.Query.SHARD; +import static accord.impl.progresslog.CallbackInvoker.invokeWaitingCallback; +import static accord.impl.progresslog.PackedKeyTracker.bitSet; +import static accord.impl.progresslog.PackedKeyTracker.clearRoundState; +import static accord.impl.progresslog.PackedKeyTracker.initialiseBitSet; +import static accord.impl.progresslog.PackedKeyTracker.roundCallbackBitSet; +import static accord.impl.progresslog.PackedKeyTracker.roundIndex; +import static accord.impl.progresslog.PackedKeyTracker.roundSize; +import static accord.impl.progresslog.PackedKeyTracker.setBitSet; +import static accord.impl.progresslog.PackedKeyTracker.setMaxRoundIndexAndClearBitSet; +import static accord.impl.progresslog.PackedKeyTracker.setRoundIndexAndClearBitSet; +import static accord.impl.progresslog.TxnStateKind.Waiting; +import static accord.impl.progresslog.WaitingState.CallbackKind.AwaitHome; +import static accord.impl.progresslog.WaitingState.CallbackKind.AwaitSlice; +import static accord.impl.progresslog.WaitingState.CallbackKind.Fetch; +import static accord.impl.progresslog.Progress.Awaiting; +import static accord.impl.progresslog.Progress.NoneExpected; +import static accord.impl.progresslog.Progress.Querying; +import static accord.impl.progresslog.Progress.Queued; +import static accord.impl.progresslog.WaitingState.CallbackKind.FetchRoute; +import static accord.primitives.EpochSupplier.constant; + +/** + * This represents a simple state machine encoded in a small number of bits for efficiently gathering + * distributed state we require locally to make progress. + * <p> + * The state machine consists of the following packed registers: + * - target BlockedUntil + * - The BlockUntil we know at least one home shard replica is able to satisfy + * - A packed bitset/counter for enumerating the relevant keys and awaiting + * remote replicas for the keys to be ready to satisfy our local requirements + * + */ +@SuppressWarnings("CodeBlock2Expr") +abstract class WaitingState extends BaseTxnState +{ + private static final int PROGRESS_SHIFT = 0; + private static final long PROGRESS_MASK = 0x3; + private static final int BLOCKED_UNTIL_SHIFT = 2; + private static final long BLOCKED_UNTIL_MASK = 0x7; + private static final int HOME_SATISFIES_SHIFT = 5; + private static final long HOME_SATISFIES_MASK = 0x7; + private static final int WAITING_AWAIT_BITS = 32; + private static final int WAITING_AWAIT_SHIFT = 8; + private static final long WAITING_AWAIT_MASK = (1L << WAITING_AWAIT_BITS) - 1; + private static final long SET_MASK = ~((PROGRESS_MASK << PROGRESS_SHIFT) | (BLOCKED_UNTIL_MASK << BLOCKED_UNTIL_SHIFT)); + private static final long INITIALISED_MASK = (PROGRESS_MASK << PROGRESS_SHIFT) | (BLOCKED_UNTIL_MASK << BLOCKED_UNTIL_SHIFT) | (HOME_SATISFIES_MASK << HOME_SATISFIES_SHIFT); + + WaitingState(TxnId txnId) + { + super(txnId); + } + + private void set(SafeCommandStore safeStore, DefaultProgressLog owner, BlockedUntil newBlockedUntil, Progress newProgress) + { + encodedState &= SET_MASK; + encodedState |= ((long) newBlockedUntil.ordinal() << BLOCKED_UNTIL_SHIFT) | ((long) newProgress.ordinal() << PROGRESS_SHIFT); + updateScheduling(safeStore, owner, Waiting, newBlockedUntil, newProgress); + } + + private void setHomeSatisfies(BlockedUntil homeStatus) + { + encodedState &= ~(HOME_SATISFIES_MASK << HOME_SATISFIES_SHIFT); + encodedState |= (long) homeStatus.ordinal() << HOME_SATISFIES_SHIFT; + } + + boolean isUninitialised() + { + return 0 == (encodedState & INITIALISED_MASK); + } + + @Nonnull BlockedUntil blockedUntil() + { + return blockedUntil(encodedState); + } + + @Nonnull BlockedUntil homeSatisfies() + { + return homeSatisfies(encodedState); + } + + final @Nonnull Progress waitingProgress() + { + return waitingProgress(encodedState); + } + + private static @Nonnull BlockedUntil blockedUntil(long encodedState) + { + return BlockedUntil.forOrdinal((int) ((encodedState >>> BLOCKED_UNTIL_SHIFT) & BLOCKED_UNTIL_MASK)); + } + + private static @Nonnull BlockedUntil homeSatisfies(long encodedState) + { + return BlockedUntil.forOrdinal((int) ((encodedState >>> HOME_SATISFIES_SHIFT) & HOME_SATISFIES_MASK)); + } + + private static @Nonnull Progress waitingProgress(long encodedState) + { + return Progress.forOrdinal((int) ((encodedState >>> PROGRESS_SHIFT) & PROGRESS_MASK)); + } + + private static int awaitRoundSize(Route<?> slicedRoute) + { + return roundSize(slicedRoute.size(), WAITING_AWAIT_BITS); + } + + private void clearAwaitState() + { + encodedState = clearRoundState(encodedState, WAITING_AWAIT_SHIFT, WAITING_AWAIT_MASK); + } + + private int awaitBitSet(int roundSize) + { + return bitSet(encodedState, roundSize, WAITING_AWAIT_SHIFT); + } + + private void initialiseAwaitBitSet(Route<?> route, Unseekables<?> notReady, int roundIndex, int roundSize) + { + encodedState = initialiseBitSet(encodedState, route, notReady, roundIndex, roundSize, WAITING_AWAIT_SHIFT); + } + + private void setAwaitBitSet(int bitSet, int roundSize) + { + encodedState = setBitSet(encodedState, bitSet, roundSize, WAITING_AWAIT_SHIFT); + } + + private int awaitRoundIndex(int roundSize) + { + return roundIndex(encodedState, roundSize, WAITING_AWAIT_SHIFT, WAITING_AWAIT_MASK); + } + + private void updateAwaitRound(int newRoundIndex, int roundSize) + { + Invariants.checkArgument(roundSize <= WAITING_AWAIT_BITS); + encodedState = setRoundIndexAndClearBitSet(encodedState, newRoundIndex, roundSize, WAITING_AWAIT_SHIFT, WAITING_AWAIT_MASK); + } + + private void setAwaitDone(int roundSize) + { + Invariants.checkArgument(roundSize <= WAITING_AWAIT_BITS); + encodedState = setMaxRoundIndexAndClearBitSet(encodedState, roundSize, WAITING_AWAIT_SHIFT, WAITING_AWAIT_MASK); + } + + Topologies contact(DefaultProgressLog owner, Unseekables<?> forKeys, long epoch) + { + Node node = owner.node(); + Topologies topologies = node.topology().forEpoch(forKeys, epoch); + return node.agent().selectPreferred(node.id(), topologies); + } + + private static EpochSupplier toLocalEpoch(SafeCommandStore safeStore, TxnId txnId, BlockedUntil blockedUntil, Command command, Timestamp executeAt) + { + long epoch = blockedUntil.fetchEpoch(txnId, executeAt); + if (command.route() != null) + epoch = Math.max(epoch, safeStore.ranges().latestEpochWithNewParticipants(epoch, command.route())); + if (command.additionalKeysOrRanges() != null) + epoch = Math.max(epoch, safeStore.ranges().latestEpochWithNewParticipants(epoch, command.additionalKeysOrRanges())); + return constant(epoch); + } + + private static Route<?> slicedRoute(SafeCommandStore safeStore, TxnId txnId, Command command, BlockedUntil blockedUntil) + { + Timestamp executeAt = command.executeAtIfKnown(); + EpochSupplier toLocalEpoch = toLocalEpoch(safeStore, txnId, blockedUntil, command, executeAt); + + Ranges ranges = safeStore.ranges().allBetween(txnId.epoch(), toLocalEpoch); + return command.route().slice(ranges); + } + + private static Route<?> slicedRoute(SafeCommandStore safeStore, TxnId txnId, Route<?> route, EpochSupplier toLocalEpoch) + { + Ranges ranges = safeStore.ranges().allBetween(txnId.epoch(), toLocalEpoch); + return route.slice(ranges); + } + + private static Route<?> awaitRoute(Route<?> slicedRoute, BlockedUntil blockedUntil) + { + return blockedUntil.waitsOn == HOME ? slicedRoute.homeKeyOnlyRoute() : slicedRoute; + } + + private static Route<?> fetchRoute(Route<?> slicedRoute, Route<?> awaitRoute, BlockedUntil blockedUntil) + { + return blockedUntil.waitsOn == blockedUntil.fetchFrom ? awaitRoute : slicedRoute; + } + + void setWaitingDone(DefaultProgressLog owner) + { + set(null, owner, CanApply, NoneExpected); + owner.clearActive(Waiting, txnId); + clearRetryCounter(); + } + + void setBlockedUntil(SafeCommandStore safeStore, DefaultProgressLog owner, BlockedUntil blockedUntil) + { + BlockedUntil currentlyBlockedUntil = blockedUntil(); + if (blockedUntil.compareTo(currentlyBlockedUntil) > 0 || isUninitialised()) + { + clearAwaitState(); + clearRetryCounter(); + owner.clearActive(Waiting, txnId); + set(safeStore, owner, blockedUntil, Queued); + } + } + + void record(DefaultProgressLog owner, SaveStatus newSaveStatus) + { + BlockedUntil currentlyBlockedUntil = blockedUntil(); + if (currentlyBlockedUntil.minSaveStatus.compareTo(newSaveStatus) <= 0) + { + boolean isDone = newSaveStatus.hasBeen(Status.PreApplied); + set(null, owner, isDone ? CanApply : currentlyBlockedUntil, NoneExpected); + if (isDone) + maybeRemove(owner); + owner.clearActive(Waiting, txnId); + } + } + + final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner) + { + run(safeStore, safeCommand, owner); + } + + private void run(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner) + { + BlockedUntil blockedUntil = blockedUntil(); + Command command = safeCommand.current(); + Invariants.checkState(!owner.hasActive(Waiting, txnId)); + Invariants.checkState(command.saveStatus().compareTo(blockedUntil.minSaveStatus) < 0, "Command has met desired criteria but progress log entry has not been cancelled"); + + set(safeStore, owner, blockedUntil, Querying); + TxnId txnId = safeCommand.txnId(); + // first make sure we have enough information to obtain the command locally + Timestamp executeAt = command.executeAtIfKnown(); + EpochSupplier toLocalEpoch = toLocalEpoch(safeStore, txnId, blockedUntil, command, executeAt); + Participants<?> fetchKeys = Invariants.nonNull(command.maxContactable()); + + if (!Route.isRoute(fetchKeys)) + { + fetchRoute(owner, blockedUntil, txnId, executeAt, toLocalEpoch, fetchKeys); + return; + } + + Route<?> route = Route.castToRoute(fetchKeys); + if (homeSatisfies().compareTo(blockedUntil) < 0) + { + awaitHomeKey(owner, blockedUntil, txnId, executeAt, route); + return; + } + + Route<?> slicedRoute = slicedRoute(safeStore, txnId, route, toLocalEpoch); + if (!command.hasBeen(Status.PreCommitted)) + { + // we know it has been decided one way or the other by the home shard at least, so we attempt a fetch + // including the home shard to get us to at least PreCommitted where we can safely wait on individual shards + fetch(owner, blockedUntil, txnId, executeAt, toLocalEpoch, slicedRoute, slicedRoute.withHomeKey()); + return; + } + + Route<?> awaitRoute = awaitRoute(slicedRoute, blockedUntil); + Route<?> fetchRoute = fetchRoute(slicedRoute, awaitRoute, blockedUntil); + + if (slicedRoute.size() == 0 || awaitRoute.isHomeKeyOnlyRoute()) + { + // at this point we can switch to polling as we know someone has the relevant state Review Comment: because the home shard can satisfy our query, and we are waiting either on the home shard only, or we don't even intersect the command's keys so need some universally-applicable state like executeAt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

