dcapwell commented on code in PR #50:
URL: https://github.com/apache/cassandra-accord/pull/50#discussion_r1267076715
##########
accord-core/src/main/java/accord/local/Bootstrap.java:
##########
@@ -119,8 +119,8 @@ class Attempt implements FetchRanges, BiConsumer<Void,
Throwable>
void start(SafeCommandStore safeStore0)
{
- globalSyncId = new TxnId(node.uniqueNowWithStaleEpoch(epoch),
ExclusiveSyncPoint, Routable.Domain.Range);
- localSyncId = globalSyncId.as(LocalOnly);
+ globalSyncId = node.nextTxnId(ExclusiveSyncPoint,
Routable.Domain.Range);
Review Comment:
so `global` maybe in `epoch=2` where as `local` is in `epoch=1`? What
motivated this change?
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -685,25 +819,27 @@ public Ballot accepted()
public static class Committed extends Accepted
{
- private final ImmutableSortedSet<TxnId> waitingOnCommit;
- private final ImmutableSortedMap<Timestamp, TxnId> waitingOnApply;
+ public final WaitingOn waitingOn;
- private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply)
+ // Empty constructor for size measurement
+ private Committed(SaveStatus status)
{
- super(common, status, executeAt, promised, accepted);
- this.waitingOnCommit = waitingOnCommit;
- this.waitingOnApply = waitingOnApply;
+ super(Mutable.EMPTY_ATTRS, status, null, null, null);
+ this.waitingOn = WaitingOn.EMPTY;
}
private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
{
- this(common, status, executeAt, promised, accepted,
waitingOn.waitingOnCommit, waitingOn.waitingOnApply);
+ super(common, status, executeAt, promised, accepted);
+ this.waitingOn = waitingOn;
+ Invariants.checkState(common.route().kind().isFullRoute());
+ Invariants.checkState(waitingOn.deps.equals(common.partialDeps()));
Review Comment:
```suggestion
Invariants.checkState(waitingOn.deps.equals(common.partialDeps()), "Deps do not
match; expected %s == %s", waitingOn.deps, common.partialDeps());
```
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -685,25 +819,27 @@ public Ballot accepted()
public static class Committed extends Accepted
{
- private final ImmutableSortedSet<TxnId> waitingOnCommit;
- private final ImmutableSortedMap<Timestamp, TxnId> waitingOnApply;
+ public final WaitingOn waitingOn;
- private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply)
+ // Empty constructor for size measurement
+ private Committed(SaveStatus status)
{
- super(common, status, executeAt, promised, accepted);
- this.waitingOnCommit = waitingOnCommit;
- this.waitingOnApply = waitingOnApply;
+ super(Mutable.EMPTY_ATTRS, status, null, null, null);
+ this.waitingOn = WaitingOn.EMPTY;
}
private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
{
- this(common, status, executeAt, promised, accepted,
waitingOn.waitingOnCommit, waitingOn.waitingOnApply);
+ super(common, status, executeAt, promised, accepted);
+ this.waitingOn = waitingOn;
+ Invariants.checkState(common.route().kind().isFullRoute());
Review Comment:
```suggestion
Invariants.checkState(common.route().kind().isFullRoute(),
"Expected a full route but given %s", common.route().kind());
```
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -459,45 +408,63 @@ void update(Ranges add)
}
}
- class CFKLoader implements CommandLoader<TxnId>
+ static class CFKEntry extends TxnId
{
- private Command loadForCFK(TxnId data)
+ final boolean uninitialised;
+ public CFKEntry(TxnId copy, boolean uninitialised)
+ {
+ super(copy);
+ this.uninitialised = uninitialised;
+ }
+ }
+
+ class CFKLoader implements CommandLoader<CFKEntry>
+ {
+ final RoutableKey key;
+ CFKLoader(RoutableKey key)
+ {
+ this.key = key;
+ }
+
+ private Command loadForCFK(CFKEntry entry)
{
- GlobalCommand globalCommand = ifPresent(data);
+ GlobalCommand globalCommand = ifPresent(entry);
if (globalCommand != null)
return globalCommand.value();
- throw new IllegalStateException("Could not find command for CFK
for " + data);
+ if (entry.uninitialised)
+ return uninitialised(entry);
+ throw new IllegalStateException("Could not find command for CFK
for " + entry);
}
@Override
- public TxnId txnId(TxnId txnId)
+ public TxnId txnId(CFKEntry txnId)
{
return loadForCFK(txnId).txnId();
}
@Override
- public Timestamp executeAt(TxnId txnId)
+ public Timestamp executeAt(CFKEntry txnId)
{
return loadForCFK(txnId).executeAt();
}
@Override
- public SaveStatus saveStatus(TxnId txnId)
+ public SaveStatus saveStatus(CFKEntry txnId)
{
return loadForCFK(txnId).saveStatus();
}
@Override
- public List<TxnId> depsIds(TxnId data)
+ public List<TxnId> depsIds(CFKEntry data)
{
PartialDeps deps = loadForCFK(data).partialDeps();
return deps != null ? deps.txnIds() : Collections.emptyList();
}
@Override
- public TxnId saveForCFK(Command command)
+ public CFKEntry saveForCFK(Command command)
{
- return command.txnId();
+ return new CFKEntry(command.txnId(), command.saveStatus() ==
SaveStatus.Uninitialised);
Review Comment:
for consistency reasons
```suggestion
return new CFKEntry(command.txnId(),
command.saveStatus().isUninitialised();
```
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -870,88 +982,318 @@ public Result result()
public static class WaitingOn
{
- public static final WaitingOn EMPTY = new
WaitingOn(ImmutableSortedSet.of(), ImmutableSortedMap.of());
- public final ImmutableSortedSet<TxnId> waitingOnCommit;
- public final ImmutableSortedMap<Timestamp, TxnId> waitingOnApply;
+ public static final WaitingOn EMPTY = new WaitingOn(Deps.NONE,
ImmutableBitSet.EMPTY, ImmutableBitSet.EMPTY, ImmutableBitSet.EMPTY);
+
+ public final Deps deps;
+ // note that transactions default to waitingOnCommit, so presence in
the set does not mean the transaction is uncommitted
+ public final ImmutableBitSet waitingOnCommit, waitingOnApply,
appliedOrInvalidated;
- public WaitingOn(ImmutableSortedSet<TxnId> waitingOnCommit,
ImmutableSortedMap<Timestamp, TxnId> waitingOnApply)
+ public WaitingOn(Deps deps, ImmutableBitSet waitingOnCommit,
ImmutableBitSet waitingOnApply, ImmutableBitSet appliedOrInvalidated)
{
+ this.deps = deps;
this.waitingOnCommit = waitingOnCommit;
this.waitingOnApply = waitingOnApply;
+ this.appliedOrInvalidated = appliedOrInvalidated;
}
- public static class Update
+ public boolean isWaitingOnCommit()
{
- private boolean hasChanges = false;
- private NavigableSet<TxnId> waitingOnCommit;
- private NavigableMap<Timestamp, TxnId> waitingOnApply;
+ return !waitingOnCommit.isEmpty();
+ }
- public Update()
- {
+ public boolean isWaitingOnApply()
+ {
+ return !waitingOnApply.isEmpty();
+ }
- }
+ public boolean isWaitingOn(TxnId txnId)
+ {
+ int index = deps.indexOf(txnId);
+ return index >= 0 && (waitingOnCommit.get(index) ||
waitingOnApply.get(index));
+ }
+
+ public TxnId nextWaitingOnCommit()
+ {
+ int i = waitingOnCommit.lastSetBit();
+ return i < 0 ? null : deps.txnId(i);
+ }
+
+ public TxnId nextWaitingOnApply()
+ {
+ int i = waitingOnApply.lastSetBit();
+ return i < 0 ? null : deps.txnId(i);
+ }
+
+ public TxnId nextWaitingOn()
+ {
+ TxnId next = nextWaitingOnApply();
+ return next != null ? next : nextWaitingOnCommit();
+ }
+
+ public boolean isAppliedOrInvalidatedRangeIdx(int i)
+ {
+ return appliedOrInvalidated.get(i + deps.keyDeps.txnIdCount());
+ }
+
+ public TxnId minWaitingOnTxnId()
+ {
+ return minWaitingOnTxnId(deps, waitingOnCommit, waitingOnApply);
+ }
+
+ static TxnId minWaitingOnTxnId(Deps deps, SimpleBitSet
waitingOnCommit, SimpleBitSet waitingOnApply)
+ {
+ int keyDepsCount = deps.keyDeps.txnIdCount();
+ int minWaitingOnKeys =
Math.min(waitingOnCommit.firstSetBitBefore(keyDepsCount, Integer.MAX_VALUE),
waitingOnApply.nextSetBitBefore(0, keyDepsCount, Integer.MAX_VALUE));
+ int minWaitingOnRanges =
Math.min(waitingOnCommit.nextSetBit(keyDepsCount, Integer.MAX_VALUE),
waitingOnApply.nextSetBit(keyDepsCount, Integer.MAX_VALUE));
+ return TxnId.nonNullOrMin(minWaitingOnKeys == Integer.MAX_VALUE ?
null : deps.txnId(minWaitingOnKeys),
+ minWaitingOnRanges == Integer.MAX_VALUE
? null : deps.txnId(minWaitingOnRanges));
+ }
+
+ static TxnId minWaitingOn(Deps deps, SimpleBitSet waitingOn)
+ {
+ int keyDepsCount = deps.keyDeps.txnIdCount();
+ int minWaitingOnKeys = waitingOn.firstSetBitBefore(keyDepsCount,
-1);
+ int minWaitingOnRanges = waitingOn.nextSetBit(keyDepsCount, -1);
+ return TxnId.nonNullOrMin(minWaitingOnKeys < 0 ? null :
deps.keyDeps.txnId(minWaitingOnKeys),
+ minWaitingOnRanges < 0 ? null :
deps.rangeDeps.txnId(minWaitingOnRanges - keyDepsCount));
+ }
+
+ static TxnId maxWaitingOn(Deps deps, SimpleBitSet waitingOn)
+ {
+ int keyDepsCount = deps.keyDeps.txnIdCount();
+ int maxWaitingOnRanges =
waitingOn.lastSetBitNotBefore(keyDepsCount);
+ int maxWaitingOnKeys = waitingOn.prevSetBit(keyDepsCount);
+ return TxnId.nonNullOrMax(maxWaitingOnKeys < 0 ? null :
deps.keyDeps.txnId(maxWaitingOnKeys),
+ maxWaitingOnRanges < 0 ? null :
deps.rangeDeps.txnId(maxWaitingOnRanges - keyDepsCount));
+ }
+
+ public ImmutableSortedSet<TxnId> computeWaitingOnCommit()
+ {
+ return computeWaitingOnCommit(deps, waitingOnCommit);
+ }
+
+ public ImmutableSortedSet<TxnId> computeWaitingOnApply()
+ {
+ return computeWaitingOnApply(deps, waitingOnCommit,
waitingOnApply);
+ }
+
+ private static ImmutableSortedSet<TxnId> computeWaitingOnCommit(Deps
deps, SimpleBitSet waitingOnCommit)
+ {
+ ImmutableSortedSet.Builder<TxnId> builder = new
ImmutableSortedSet.Builder<>(TxnId::compareTo);
+ waitingOnCommit.forEach(builder, deps, (b, d, i) ->
b.add(d.txnId(i)));
+ return builder.build();
+ }
+
+ private static ImmutableSortedSet<TxnId> computeWaitingOnApply(Deps
deps, SimpleBitSet waitingOnCommit, SimpleBitSet waitingOnApply)
+ {
+ ImmutableSortedSet.Builder<TxnId> builder = new
ImmutableSortedSet.Builder<>(TxnId::compareTo);
+ waitingOnApply.forEach(builder, deps, waitingOnCommit, (b, d, s,
i) -> {
+ if (!s.get(i))
+ b.add(d.txnId(i));
+ });
+ return builder.build();
+ }
+
+ private static String toString(Deps deps, SimpleBitSet
waitingOnCommit, SimpleBitSet waitingOnApply)
+ {
+ return "onApply=" + computeWaitingOnApply(deps, waitingOnCommit,
waitingOnApply).descendingSet() + ", onCommit=" + computeWaitingOnCommit(deps,
waitingOnCommit).descendingSet();
+ }
+
+ @Override
+ public String toString()
+ {
+ return toString(deps, waitingOnCommit, waitingOnApply);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ return other instanceof WaitingOn && this.equals((WaitingOn)
other);
+ }
+
+ public boolean equals(WaitingOn other)
+ {
+ return this.deps.equals(other.deps)
+ && this.waitingOnCommit.equals(other.waitingOnCommit)
+ && this.waitingOnApply.equals(other.waitingOnApply)
+ &&
this.appliedOrInvalidated.equals(other.appliedOrInvalidated);
+ }
+
+ public static class Update
+ {
+ final Deps deps;
+ private SimpleBitSet waitingOnCommit, waitingOnApply,
appliedOrInvalidated;
public Update(WaitingOn waitingOn)
{
+ this.deps = waitingOn.deps;
this.waitingOnCommit = waitingOn.waitingOnCommit;
this.waitingOnApply = waitingOn.waitingOnApply;
+ this.appliedOrInvalidated = waitingOn.appliedOrInvalidated;
}
public Update(Committed committed)
{
- this.waitingOnCommit = committed.waitingOnCommit();
- this.waitingOnApply = committed.waitingOnApply();
+ this(committed.waitingOn);
+ }
+
+ public Update(Ranges ranges, Unseekables<?> participants, Deps
deps)
+ {
+ this.deps = deps;
+ this.waitingOnCommit = new SimpleBitSet(deps.txnIdCount(),
false);
+ deps.keyDeps.forEach(ranges, 0, deps.keyDeps.txnIdCount(),
this, null, (u, v, i) -> {
Review Comment:
good catch, this was not clear when reading and took awhile to click...
before we said we waitOn 100% of txn, now only the txn that intersect the
range... was this a topology change issue?
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -87,44 +88,30 @@ private static class StoreSupplier
this.shardFactory = shardFactory;
}
- CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
+ CommandStore create(int id, EpochUpdateHolder rangesForEpoch)
{
return shardFactory.create(id, time, agent, this.store,
progressLogFactory, rangesForEpoch);
}
}
- public static class RangesForEpochHolder
- {
- // no need for safe publication; RangesForEpoch members are final, and
will be guarded by other synchronization actions
- protected RangesForEpoch current;
-
- /**
- * This is updated asynchronously, so should only be fetched between
executing tasks;
- * otherwise the contents may differ between invocations for the same
task.
- * @return the current RangesForEpoch
- */
- public RangesForEpoch get() { return current; }
- }
-
static class ShardHolder
{
final CommandStore store;
- final RangesForEpochHolder ranges;
+ RangesForEpoch ranges;
Review Comment:
the mutation is guarded by
`accord.local.CommandStores#updateTopology(accord.local.Node,
accord.local.CommandStores.Snapshot, accord.topology.Topology, boolean)` but
the following call sites read without the lock
* `accord.local.CommandStores#mapReduce(accord.local.PreLoadContext,
accord.primitives.Routables<?>, long, long, accord.utils.MapReduce<? super
accord.local.SafeCommandStore,O>)`
*
`accord.local.CommandStores#select(java.util.function.Predicate<accord.primitives.Ranges>)`
* `accord.local.CommandStores#unsafeForKey`
so should be possible that these reads are unsafe and will read stale values
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -148,12 +135,14 @@ public RangesForEpoch(long[] epochs, Ranges[] ranges,
CommandStore store)
this.store = store;
}
- public RangesForEpoch withRanges(long epoch, Ranges ranges)
+ public RangesForEpoch withRanges(long epoch, Ranges latestRanges)
{
- long[] newEpochs = Arrays.copyOf(this.epochs, this.epochs.length +
1);
- Ranges[] newRanges = Arrays.copyOf(this.ranges, this.ranges.length
+ 1);
- newEpochs[this.epochs.length] = epoch;
- newRanges[this.ranges.length] = ranges;
+ Invariants.checkArgument(epochs.length == 0 ||
epochs[epochs.length - 1] <= epoch);
+ int newLength = epochs.length == 0 || epochs[epochs.length - 1] <
epoch ? epochs.length + 1 : epochs.length;
+ long[] newEpochs = Arrays.copyOf(epochs, newLength);
+ Ranges[] newRanges = Arrays.copyOf(ranges, newLength);
+ newEpochs[newLength - 1] = epoch;
Review Comment:
In trying to understand this logic, it looks like its very dependent on the
caller calling things in the correct order... would be good to add the
following check
```suggestion
Invariants.checkState(newEpochs[newLength - 1] == 0 ||
newEpochs[newLength - 1] == epoch, "Attempted to override historic epoch %d
with %d", newEpochs[newLength - 1], epoch);
newEpochs[newLength - 1] = epoch;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]