dcapwell commented on code in PR #106:
URL: https://github.com/apache/cassandra-accord/pull/106#discussion_r1693328234
##########
accord-core/src/main/java/accord/messages/Commit.java:
##########
@@ -161,6 +161,7 @@ protected Commit(Kind kind, TxnId txnId, PartialRoute<?>
scope, long waitForEpoc
public static void commitMinimal(Node node, Topologies
coordinateEpochOnly, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
Timestamp executeAt, Deps unstableDeps, Callback<ReadReply> callback)
{
+ Invariants.checkArgument(coordinateEpochOnly.size() == 1);
Review Comment:
for debugging can you show what epochs are included?
##########
accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java:
##########
@@ -310,6 +310,10 @@ public int compareTo(@Nonnull RoutableKey that)
int rightInt = that instanceof Sentinel ? (((Sentinel)
that).isMin ? -1 : 1) : 0;
rc = Integer.compare(leftInt, rightInt);
}
+ else if (!(this instanceof PrefixedIntRoutingKey) && (!(that
instanceof PrefixedIntRoutingKey)))
Review Comment:
`PrefixedIntRoutingKey -> Hash | Sentinel`
`PrefixedIntHashKey -> Hash | Sentinel | Key`
so this is just saying `else if (this.isKey() && that.isKey())`? Since you
are checking the key it might be best to do that?
##########
accord-core/src/main/java/accord/utils/Invariants.java:
##########
@@ -27,13 +27,16 @@
public class Invariants
{
- // TODO (now): configure by system parameter and turn off by default
- private static final boolean PARANOID = true;
- private static final boolean DEBUG = true;
+ private static final int PARANOIA =
Integer.parseInt(System.getProperty("accord.paranoia", "0"));
+ private static final boolean DEBUG = System.getProperty("accord.debug",
"false").equals("true");
Review Comment:
does this compile in C*? We have a check style (that does check Accord
code) that we don't touch these APIs
##########
accord-core/src/test/java/accord/utils/AccordGens.java:
##########
@@ -227,7 +236,14 @@ public static Gen<Key> keysInsideRanges(Ranges ranges)
public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen)
Review Comment:
this is used in maelstrom and
`org.apache.cassandra.utils.AccordGenerators#keyDepsGen()`, need to double
check the C* to make sure this change is fine.
##########
accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java:
##########
@@ -232,7 +235,7 @@ public <T> AsyncChain<T> submit(PreLoadContext context,
Function<? super SafeCom
public <T> AsyncChain<T> submit(Callable<T> fn)
{
Task<T> task = new DelayedTask<>(fn);
- if (Invariants.isParanoid())
+ if (Invariants.paranoia() >= 3)
Review Comment:
an issue I have with this is there isn't really any documentation on what
levels are allowed or what they mean... this might be a better case for an enum
then int level?
##########
accord-core/src/main/java/accord/local/RedundantBefore.java:
##########
@@ -109,6 +109,8 @@ public Entry(Range range, long startEpoch, long endEpoch,
@Nonnull TxnId locally
this.shardAppliedOrInvalidatedBefore =
shardAppliedOrInvalidatedBefore;
this.bootstrappedAt = bootstrappedAt;
this.staleUntilAtLeast = staleUntilAtLeast;
+
Invariants.checkArgument(locallyAppliedOrInvalidatedBefore.equals(TxnId.NONE)
|| locallyAppliedOrInvalidatedBefore.domain().isRange());
Review Comment:
so this just blocks key txn from showing up? In BurnTest we can do `Range`
read/write and I think we wanted to add range reads in C* eventually, so is
`range` the best here or is it really `ExclusiveSyncPoint` that we want?
##########
accord-core/src/test/java/accord/local/CommandsForKeyTest.java:
##########
@@ -0,0 +1,1067 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.junit.jupiter.api.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Agent;
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.api.Update;
+import accord.impl.IntKey;
+import accord.local.Command.AbstractCommand;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.EpochSupplier;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Route;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+import accord.primitives.Writes;
+import accord.utils.Invariants;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResults;
+
+import static accord.local.Status.Durability.NotDurable;
+
+// TODO (expected): test setting redundant before
+// TODO (expected): test ballot updates
+// TODO (expected): test transition to Erased
+// TODO (expected): ensure execution is not too early
+// TODO (expected): validate mapReduce
+// TODO (expected): insert linearizability violations and detect them
+public class CommandsForKeyTest
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CommandsForKeyTest.class);
+ private static final Key KEY = IntKey.key(1);
+ private static final Range RANGE = IntKey.range(0, 2);
+ private static final Keys KEYS = IntKey.keys(1);
+ private static final Ranges RANGES = Ranges.of(IntKey.range(0, 2));
+ private static final Txn KEY_TXN = new Txn.InMemory(KEYS, new
TestRead(KEYS), new TestQuery());
+ private static final Txn RANGE_TXN = new Txn.InMemory(RANGES, new
TestRead(RANGES), new TestQuery());
+ private static final FullRoute KEY_ROUTE = KEYS.toRoute(IntKey.routing(1));
+ private static final FullRoute RANGE_ROUTE =
RANGES.toRoute(IntKey.routing(1));
+
+ static class CommandUpdate
+ {
+ final Command prev, next;
+
+ CommandUpdate(Command prev, Command next)
+ {
+ this.prev = prev;
+ this.next = next;
+ }
+ }
+
+ // TODO (expected): randomise ballots
+ static class Canon implements NotifySink
+ {
+ private static final TxnId MIN = new TxnId(1, 1, Txn.Kind.Read,
Domain.Key, new Node.Id(1));
+
+ // TODO (expected): randomise ratios
+ static final Txn.Kind[] KINDS = new Txn.Kind[] { Txn.Kind.Read,
Txn.Kind.Write, Txn.Kind.EphemeralRead, Txn.Kind.SyncPoint,
Txn.Kind.ExclusiveSyncPoint };
+ final RandomSource rnd;
+ final Node.Id[] nodeIds;
+ final Domain[] domains;
+ final TreeSet<TxnId> unwitnessed = new TreeSet<>();
+ final TreeSet<TxnId> undecided = new TreeSet<>();
+ final TreeSet<TxnId> candidates = new TreeSet<>();
+ final TreeSet<TxnId> unfinished = new TreeSet<>();
+ final TreeMap<Timestamp, Command> byId = new TreeMap<>();
+ final TreeMap<Timestamp, Command> committedByExecuteAt = new
TreeMap<>();
+ final Set<Timestamp> executeAts = new HashSet<>();
+ boolean closing;
+ int undecidedCount;
+
+ static final EnumMap<SaveStatus, SaveStatus[]> TRANSITIONS = new
EnumMap<>(SaveStatus.class);
+
+ void set(Command prev, Command next)
+ {
+ byId.put(next.txnId(), next);
+ if (next.hasBeen(Status.Committed))
+ {
+ undecided.remove(next.txnId());
+ committedByExecuteAt.put(next.executeAt(), next);
+ if (next.hasBeen(Status.Stable))
+ {
+ if (prev.saveStatus() != next.saveStatus())
+ candidates.remove(next.txnId());
+
+ if (next.hasBeen(Status.Applied))
+ {
+ unfinished.remove(next.txnId());
+ if (!CommandsForKey.managesExecution(next.txnId()))
+ removeWaitingOn(next.txnId(), Timestamp.MAX);
+ }
+ else
+ {
+ Command.Committed committed = next.asCommitted();
+ if (!committed.isWaitingOnDependency() &&
(!prev.hasBeen(Status.Stable) || prev.asCommitted().isWaitingOnDependency()))
+ readyToExecute(committed);
+ }
+ }
+ if (next.hasBeen(Status.Committed) &&
!prev.hasBeen(Status.Committed) && !next.hasBeen(Status.Invalidated))
+ {
+ if (!next.executeAt().equals(next.txnId()) &&
!CommandsForKey.manages(next.txnId()))
+ removeWaitingOn(next.txnId(), next.executeAt());
+ }
+
+ }
+ }
+
+ private void readyToExecute(Command.Committed committed)
+ {
+ for (Command pred :
committedByExecuteAt.headMap(committed.executeAt(), false).values())
+ Invariants.checkState(pred.hasBeen(Status.Applied) ||
!committed.txnId().kind().witnesses(pred.txnId()));
+ candidates.add(committed.txnId());
+ }
+
+ private void removeWaitingOn(TxnId waitingId, Timestamp until)
+ {
+ for (Command command : new
ArrayList<>(committedByExecuteAt.subMap(waitingId, false, until,
false).values()))
+ {
+ if (!command.hasBeen(Status.Stable))
+ continue;
+
+ Command.Committed committed = command.asCommitted();
+ Command.WaitingOn waitingOn = committed.waitingOn;
+ if (waitingOn.isWaitingOn(waitingId))
+ {
+ Command.WaitingOn.Update update = new
Command.WaitingOn.Update(waitingOn);
+ update.removeWaitingOn(waitingId);
+ set(committed, Command.Committed.committed(committed,
committed, update.build()));
+ }
+ }
+ }
+
+ @Override
+ public void notWaiting(SafeCommandStore safeStore, TxnId txnId, Key
key)
+ {
+ Command.Committed prev = byId.get(txnId).asCommitted();
+ Command.WaitingOn.Update waitingOn = new
Command.WaitingOn.Update(prev.waitingOn);
+ if (!waitingOn.removeWaitingOn(key))
+ return;
+
+ Command.Committed next = Command.Committed.committed(prev, prev,
waitingOn.build());
+ set(prev, next);
+ }
+
+ @Override
+ public void notWaiting(SafeCommandStore safeStore, SafeCommand
safeCommand, Key key)
+ {
+ notWaiting(safeStore, safeCommand.txnId(), key);
+ }
+
+ @Override
+ public void waitingOnCommit(SafeCommandStore safeStore,
CommandsForKey.TxnInfo uncommitted, Key key)
+ {
+ }
+
+ static
+ {
+ TRANSITIONS.put(SaveStatus.NotDefined, new SaveStatus[] {
SaveStatus.PreAccepted, SaveStatus.AcceptedInvalidate,
SaveStatus.AcceptedInvalidateWithDefinition, SaveStatus.Accepted,
SaveStatus.AcceptedWithDefinition, SaveStatus.Committed, SaveStatus.Stable,
SaveStatus.Invalidated });
+ TRANSITIONS.put(SaveStatus.PreAccepted, new SaveStatus[] {
SaveStatus.AcceptedInvalidateWithDefinition, SaveStatus.AcceptedWithDefinition,
SaveStatus.Committed, SaveStatus.Stable, SaveStatus.Invalidated });
+ // permit updated ballot and moving to other statuses
+ TRANSITIONS.put(SaveStatus.AcceptedInvalidate, new SaveStatus[] {
SaveStatus.Invalidated });
+ TRANSITIONS.put(SaveStatus.AcceptedInvalidateWithDefinition, new
SaveStatus[] { SaveStatus.Invalidated });
+ TRANSITIONS.put(SaveStatus.Accepted, new SaveStatus[] {
SaveStatus.Committed, SaveStatus.Stable, SaveStatus.Invalidated });
+ TRANSITIONS.put(SaveStatus.AcceptedWithDefinition, new
SaveStatus[] { SaveStatus.Committed, SaveStatus.Stable, SaveStatus.Invalidated
});
+ TRANSITIONS.put(SaveStatus.Committed, new SaveStatus[] {
SaveStatus.Stable });
+ TRANSITIONS.put(SaveStatus.Stable, new SaveStatus[] {
SaveStatus.Applied });
+ }
+
+ Canon(RandomSource rnd)
+ {
+ this.rnd = rnd;
+ this.nodeIds = new Node.Id[10];
+ for (int i = 0 ; i < nodeIds.length ; ++i)
+ nodeIds[i] = new Node.Id(i + 1);
+ this.domains = Domain.values();
+ }
+
+ Canon(RandomSource rnd, Node.Id[] nodeIds)
+ {
+ this.rnd = rnd;
+ this.nodeIds = nodeIds;
+ this.domains = Domain.values();
+ }
+
+ boolean isDone()
+ {
+ return closing && unfinished.size() == 0;
+ }
+
+ void close()
+ {
+ closing = true;
+ }
+
+ CommandUpdate update(boolean hasWaitingTasks)
+ {
+ boolean generate = !closing && rnd.decide(1 / (1f +
undecidedCount));
+ if (!generate && candidates.isEmpty() && hasWaitingTasks)
+ return null;
+ Invariants.checkArgument (!candidates.isEmpty() ||
(unfinished.size() == unwitnessed.size()));
+ Command prev = generate || candidates.isEmpty() ?
unwitnessed(generateId()) : selectOne(candidates);
+ boolean invalidate = false;
+ if (prev.txnId().kind() == Txn.Kind.ExclusiveSyncPoint &&
!prev.hasBeen(Status.Committed))
+ {
+ // may need to invalidate
+ if (byId.tailMap(prev.txnId(),
false).values().stream().anyMatch(c -> c.hasBeen(Status.Committed) &&
c.txnId().kind().witnesses(prev.txnId())))
+ invalidate = true;
+ }
+ Command next = invalidate ?
AbstractCommand.validate((AbstractCommand)update(prev, SaveStatus.Invalidated))
+ :
AbstractCommand.validate((AbstractCommand)update(prev));
+ set(prev, next);
+ unwitnessed.remove(next.txnId());
+ return new CommandUpdate(prev, next);
+ }
+
+ private Command update(Command prev)
+ {
+ SaveStatus[] candidates = TRANSITIONS.get(prev.saveStatus());
+ return update(prev, candidates[rnd.nextInt(candidates.length)]);
+ }
+
+ private Command update(Command prev, SaveStatus newStatus)
+ {
+ switch (newStatus)
+ {
+ default:
+ case NotDefined:
+ case TruncatedApply:
+ case TruncatedApplyWithDeps:
+ case TruncatedApplyWithOutcome:
+ case PreApplied:
+ case PreCommitted:
+ throw new AssertionError();
+
+ case PreAccepted:
+ return preaccepted(prev.txnId());
+
+ case Accepted:
+ case AcceptedWithDefinition:
+ return accepted(prev.txnId(),
generateExecuteAt(prev.txnId()), newStatus);
+
+ case AcceptedInvalidate:
+ case AcceptedInvalidateWithDefinition:
+ return acceptedInvalidated(prev.txnId(), newStatus);
+
+ case Committed:
+ return committed(prev.txnId(),
prev.executeAtIfKnown(generateExecuteAt(prev.txnId())));
+
+ case Stable:
+ return stable(prev.txnId(),
prev.executeAtIfKnown(generateExecuteAt(prev.txnId())),
prev.hasBeen(Status.Committed) ? prev.asCommitted() : null);
+
+ case Applied:
+ return applied(prev.txnId(),
prev.executeAtIfKnown(generateExecuteAt(prev.txnId())),
prev.hasBeen(Status.Committed) ? prev.asCommitted() : null);
+
+ case Invalidated:
+ return invalidated(prev.txnId());
+ }
+ }
+
+ private Deps generateDeps(TxnId txnId, Timestamp executeAt, Status
forStatus)
+ {
+ maybeGenerateUnwitnessed();
+ try (Deps.Builder builder = new Deps.Builder())
+ {
+ for (Command command : byId.headMap(executeAt, false).values())
+ {
+ if (txnId.equals(command.txnId())) continue;
+ if (!txnId.kind().witnesses(command.txnId())) continue;
+
+ if (command.hasBeen(forStatus.compareTo(Status.Accepted)
<= 0 ? Status.Committed : Status.Accepted) || rnd.nextBoolean())
+ {
+ unwitnessed.remove(command.txnId());
+ builder.add(command.txnId().domain() == Domain.Key ?
KEY : RANGE, command.txnId());
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private Timestamp generateExecuteAt(TxnId txnId)
+ {
+ if (txnId.kind().awaitsOnlyDeps())
+ return txnId;
+
+ Timestamp min = Timestamp.NONE;
+ if (!committedByExecuteAt.isEmpty())
+ {
+ min = committedByExecuteAt.lastEntry().getValue().executeAt();
+ min = Timestamp.fromValues(min.epoch(), min.hlc() + 1,
min.node);
+ }
+
+ if (min.compareTo(txnId) <= 0 && rnd.nextBoolean())
+ return txnId;
+
+ min = Timestamp.max(min, Timestamp.fromValues(txnId.epoch(),
txnId.hlc() + 1, txnId.node));
+ Timestamp max = Timestamp.fromValues(min.epoch(), min.hlc() + 100,
min.node);
+
+ Timestamp executeAt = generateTimestamp(min, max, true);
+ Invariants.checkState(executeAt.compareTo(txnId) >= 0);
+ executeAts.add(executeAt);
+ return executeAt;
+ }
+
+ private void maybeGenerateUnwitnessed()
+ {
+ int transitiveCount = rnd.decide(1 / (1f + unwitnessed.size())) ?
rnd.nextInt(0, 3) : 0;
+ while (transitiveCount-- > 0)
+ {
+ // generate new unwitnessed transactions
+ TxnId next = generateId();
+ byId.put(next, unwitnessed(next));
+ }
+ }
+
+ TxnId generateId()
+ {
+ TxnId min = MIN;
+ TxnId max;
+ if (byId.isEmpty()) max = new TxnId(1, 100, Txn.Kind.Read,
Domain.Key, nodeIds[0]);
+ else
+ {
+ max = byId.lastEntry().getValue().txnId();
+ switch (rnd.nextInt(3))
+ {
+ default: throw new AssertionError();
+ case 2:
+ min = max;
+ case 1:
+ max = new TxnId(max.epoch(), max.hlc() + 100,
max.kind(), max.domain(), max.node);
+ case 0:
+
+ }
+ }
+ return generateId(min, max, true);
+ }
+
+ TxnId generateId(TxnId min, TxnId max, boolean unique)
+ {
+ TxnId result = generateId(min, max);
+ while (unique && (byId.containsKey(result) ||
executeAts.contains(result)))
+ result = generateId(min, max);
+ return result;
+ }
+
+ TxnId generateId(TxnId min, TxnId max)
+ {
+ long epoch = min.epoch() == max.epoch() ? min.epoch() :
rnd.nextLong(min.epoch(), max.epoch());
+ long hlc = min.hlc() == max.hlc() ? min.hlc() :
rnd.nextLong(min.hlc(), max.hlc());
+
+ Node.Id node;
+ if (hlc == min.hlc()) node = min.node.id == nodeIds.length + 1 ?
min.node : nodeIds[rnd.nextInt(min.node.id - 1, nodeIds.length)];
+ else if (hlc == max.hlc()) node = max.node.id == 1 ? max.node :
nodeIds[rnd.nextInt(0, max.node.id - 1)];
+ else node = rnd.pick(nodeIds);
+
+ Txn.Kind kind;
+ if (hlc == min.hlc()) kind = min.kind();
+ else if (hlc == max.hlc()) kind = max.kind();
+ else kind = rnd.pick(KINDS);
+
+ Domain domain;
+ if (hlc == min.hlc() && min.domain() == Domain.Range) domain =
Domain.Range;
+ else if (hlc == max.hlc() && max.domain() == Domain.Key) domain =
Domain.Key;
+ else domain = rnd.nextBoolean() ? Domain.Key : Domain.Range;
+
+ return new TxnId(epoch, hlc, kind, domain, node);
+ }
+
+ Timestamp generateTimestamp(Timestamp min, Timestamp max, boolean
unique)
+ {
+ Timestamp result = generateTimestamp(min, max);
+ while (unique && (byId.containsKey(result) ||
executeAts.contains(result)))
+ result = generateTimestamp(min, max);
+ return result;
+ }
+
+ Timestamp generateTimestamp(Timestamp min, Timestamp max)
+ {
+ Invariants.checkArgument(min.flags() == 0);
+ Invariants.checkArgument(max.flags() == 0);
+ long epoch = min.epoch() == max.epoch() ? min.epoch() :
rnd.nextLong(min.epoch(), max.epoch());
+ long hlc;
+ Node.Id node;
+ if (min.hlc() == max.hlc()) hlc = min.hlc();
+ else hlc = rnd.nextLong(min.hlc(), max.hlc());
+
+ if (hlc == min.hlc()) node = min.node.id == nodeIds.length + 1 ?
min.node : nodeIds[rnd.nextInt(min.node.id - 1, nodeIds.length)];
+ else if (hlc == max.hlc()) node = max.node.id == 1 ? max.node :
nodeIds[rnd.nextInt(0, max.node.id - 1)];
+ else node = rnd.pick(nodeIds);
+
+ Timestamp result = Timestamp.fromValues(epoch, hlc, node);
+ Invariants.checkState(result.compareTo(min) >= 0);
+ return result;
+ }
+
+ Command unwitnessed(TxnId txnId)
+ {
+ Command command = new Command.NotDefined(common(txnId),
SaveStatus.NotDefined, Ballot.ZERO);
+ unwitnessed.add(txnId);
+ undecided.add(txnId);
+ candidates.add(txnId);
+ unfinished.add(txnId);
+ return command;
+ }
+
+ Command preaccepted(TxnId txnId)
+ {
+ return Command.PreAccepted.preAccepted(common(txnId), txnId,
Ballot.ZERO);
+ }
+
+ Command acceptedInvalidated(TxnId txnId, SaveStatus saveStatus)
+ {
+ return new Command.Accepted(common(txnId,
saveStatus.known.definition.isKnown()),
+ saveStatus, Ballot.ZERO, txnId,
Ballot.ZERO);
+ }
+
+ Command accepted(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus)
+ {
+ Deps deps = generateDeps(txnId, txnId, Status.Accepted);
+ return new Command.Accepted(common(txnId,
saveStatus.known.definition.isKnown()).partialDeps(deps.slice(RANGES)),
+ saveStatus, Ballot.ZERO, executeAt,
Ballot.ZERO);
+ }
+
+ Command committed(TxnId txnId, Timestamp executeAt)
+ {
+ Deps deps = generateDeps(txnId, executeAt, Status.Committed);
+ return
Command.Committed.committed(common(txnId).partialDeps(deps.slice(RANGES)),
SaveStatus.Committed, executeAt, Ballot.ZERO, Ballot.ZERO, null);
+ }
+
+ Command stable(TxnId txnId, Timestamp executeAt, @Nullable
Command.Committed committed)
+ {
+ Deps deps = committed == null ? generateDeps(txnId, executeAt,
Status.Stable) : committed.partialDeps();
+ CommonAttributes common =
common(txnId).partialDeps(deps.slice(RANGES));
+ Command.WaitingOn waitingOn = initialiseWaitingOn(txnId,
executeAt, common.route(), deps);
+ return Command.Committed.committed(common, SaveStatus.Stable,
executeAt, Ballot.ZERO, Ballot.ZERO, waitingOn);
+ }
+
+ Command applied(TxnId txnId, Timestamp executeAt, @Nullable
Command.Committed committed)
+ {
+ Deps deps = committed == null ? generateDeps(txnId, executeAt,
Status.Applied) : committed.partialDeps();
+ CommonAttributes common =
common(txnId).partialDeps(deps.slice(RANGES));
+ Command.WaitingOn waitingOn = committed == null ||
committed.waitingOn == null ? initialiseWaitingOn(txnId, executeAt,
common.route(), deps) : committed.waitingOn;
+ return new Command.Executed(common, SaveStatus.Applied, executeAt,
Ballot.ZERO, Ballot.ZERO, waitingOn,
+ new Writes(txnId, executeAt, KEYS,
null), new Result(){});
+ }
+
+ Command invalidated(TxnId txnId)
+ {
+ return new Command.Truncated(common(txnId),
SaveStatus.Invalidated, Timestamp.NONE, null, null);
+ }
+
+ CommonAttributes.Mutable common(TxnId txnId)
+ {
+ return common(txnId, true);
+ }
+
+ CommonAttributes.Mutable common(TxnId txnId, boolean withDefinition)
+ {
+ CommonAttributes.Mutable result = new
CommonAttributes.Mutable(txnId)
+ .durability(NotDurable)
+ .route(txnId.domain() == Domain.Key ? KEY_ROUTE :
RANGE_ROUTE);
+
+ if (withDefinition)
+ result.partialTxn((txnId.domain() == Domain.Key ? KEY_TXN :
RANGE_TXN).slice(RANGES, true));
+
+ return result;
+ }
+
+ private Command.WaitingOn initialiseWaitingOn(TxnId txnId, Timestamp
executeAt, Route<?> route, Deps deps)
+ {
+ Command.WaitingOn.Update waitingOn =
Command.WaitingOn.Update.initialise(txnId, route, RANGES, deps);
+ for (int i = 0 ; i < waitingOn.txnIdCount() ; ++i)
+ {
+ TxnId dep = waitingOn.txnId(i);
+ Command command = byId.get(dep);
+ if (command.hasBeen(Status.Applied) ||
(command.hasBeen(Status.Committed) && command.executeAt().compareTo(executeAt)
> 0))
+ waitingOn.removeWaitingOn(dep);
+ }
+ return waitingOn.build();
+ }
+
+ private Command selectOne(TreeSet<TxnId> from)
+ {
+ TxnId bound = generateId(from.first(), from.last());
+ TxnId txnId = from.floor(bound);
+ return byId.get(txnId);
+ }
+ }
+
+ @Test
+ public void testOne()
+ {
+// test(System.nanoTime(), 1000);
+ test(95150589325083L, 1000);
+ }
+
+ @Test
+ public void testMany()
+ {
+ long seed = System.nanoTime();
+ for (int i = 0 ; i < 10000 ; ++i)
+ {
+ test(seed++, 1000);
+ }
+ }
+
+ private static void test(long seed, int minCount)
+ {
+ logger.info("Seed {}", seed);
+ try
+ {
+ final RandomSource rnd = RandomSource.wrap(new Random(seed));
Review Comment:
```suggestion
final RandomSource rnd = new DefaultRandom(seed);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]