bdeggleston commented on code in PR #106:
URL: https://github.com/apache/cassandra-accord/pull/106#discussion_r1688744943
##########
accord-core/src/main/java/accord/api/Agent.java:
##########
@@ -74,6 +74,10 @@ default void onLocalBarrier(@Nonnull Seekables<?, ?>
keysOrRanges, @Nonnull TxnI
*/
long preAcceptTimeout();
+ long cfkHlcPruneDelta();
Review Comment:
could we get a little docstring about these 2?
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -91,37 +103,55 @@ public Builder()
@Override
public Deps build()
{
- return new Deps(keyBuilder.build(), rangeBuilder == null ?
RangeDeps.NONE : rangeBuilder.build());
+ return new Deps(keyBuilder.build(),
+ rangeBuilder == null ? RangeDeps.NONE :
rangeBuilder.build(),
+ directKeyBuilder == null ? KeyDeps.NONE :
directKeyBuilder.build());
}
}
- public final KeyDeps keyDeps;
+ public final KeyDeps keyDeps; // key dependencies we will track via
CommandsForKey
public final RangeDeps rangeDeps;
+ public final KeyDeps directKeyDeps; // key dependencies we will track via
direct dependency relationships
Review Comment:
The key dependencies have been split into 2 tiers here, and it's not clear
what the difference is and why they need to be separate. Can you add some
documentation explaining the types of deps and how they are different and fit
together?
##########
accord-core/src/main/java/accord/primitives/PartialDeps.java:
##########
@@ -101,7 +98,7 @@ public PartialDeps reconstitutePartial(Ranges covering)
if (covers(covering))
return this;
- return new PartialDeps(covering, keyDeps, rangeDeps);
+ return new PartialDeps(covering, keyDeps, rangeDeps, directKeyDeps);
Review Comment:
is this even reachable?
##########
accord-core/src/main/java/accord/local/CommandsForKey.java:
##########
@@ -372,112 +483,208 @@ public String toString()
return "Info{" +
"txnId=" + toPlainString() +
", status=" + status +
- ", executeAt=" + (executeAt == this ? toPlainString() :
executeAt.toString()) +
+ ", executeAt=" + plainExecuteAt() +
'}';
}
public String toPlainString()
{
return super.toString();
}
+
+ public int compareExecuteAt(TxnInfo that)
+ {
+ return this.executeAt.compareTo(that.executeAt);
+ }
+
+ Timestamp executeAtIfKnownElseTxnId()
+ {
+ return status == INVALID_OR_TRUNCATED ? this : executeAt;
+ }
}
- public static class TxnInfoWithMissing extends TxnInfo
+ public static class TxnInfoExtra extends TxnInfo
{
public final TxnId[] missing;
Review Comment:
could you add a comment explaining what's missing and/or make the name more
descriptive?
##########
accord-core/src/main/java/accord/local/CommandsForKey.java:
##########
@@ -1046,29 +1542,20 @@ static class InfoWithAdditions
}
}
- private Object computeInsert(int insertPos, TxnId txnId, InternalStatus
newStatus, Command command)
- {
- return computeInfoAndAdditions(insertPos, -1, txnId, newStatus,
command);
- }
-
- private Object computeUpdate(int updatePos, TxnId txnId, InternalStatus
newStatus, Command command)
- {
- return computeInfoAndAdditions(updatePos, updatePos, txnId, newStatus,
command);
- }
-
private Object computeInfoAndAdditions(int insertPos, int updatePos, TxnId
txnId, InternalStatus newStatus, Command command)
{
- Timestamp executeAt = txnId;
- if (newStatus.hasInfo)
- {
- executeAt = command.executeAt();
- if (executeAt.equals(txnId)) executeAt = txnId;
- }
+ Invariants.checkState(newStatus.hasExecuteAtOrDeps);
+ Timestamp executeAt = command.executeAt();
+ if (executeAt.equals(txnId)) executeAt = txnId;
+ Ballot ballot = Ballot.ZERO;
+ if (newStatus.hasBallot)
+ ballot = command.acceptedOrCommitted();
+
Timestamp depsKnownBefore = newStatus.depsKnownBefore(txnId,
executeAt);
- return computeInfoAndAdditions(insertPos, updatePos, txnId, newStatus,
executeAt, depsKnownBefore, command.partialDeps().keyDeps.txnIds(key));
+ return computeInfoAndAdditions(insertPos, updatePos, txnId, newStatus,
ballot, executeAt, depsKnownBefore, command.partialDeps().keyDeps.txnIds(key));
}
- private Object computeInfoAndAdditions(int insertPos, int updatePos, TxnId
plainTxnId, InternalStatus newStatus, Timestamp executeAt, Timestamp
depsKnownBefore, SortedList<TxnId> deps)
+ private Object computeInfoAndAdditions(int insertPos, int updatePos, TxnId
plainTxnId, InternalStatus newStatus, Ballot ballot, Timestamp executeAt,
Timestamp depsKnownBefore, SortedList<TxnId> deps)
Review Comment:
Could we rework the class hierarchy of TxnInfo/InfoAndAdditions so we're not
returning Object here (and the calling method)?
##########
accord-core/src/main/java/accord/local/CommandsForKey.java:
##########
@@ -372,112 +483,208 @@ public String toString()
return "Info{" +
"txnId=" + toPlainString() +
", status=" + status +
- ", executeAt=" + (executeAt == this ? toPlainString() :
executeAt.toString()) +
+ ", executeAt=" + plainExecuteAt() +
'}';
}
public String toPlainString()
{
return super.toString();
}
+
+ public int compareExecuteAt(TxnInfo that)
+ {
+ return this.executeAt.compareTo(that.executeAt);
+ }
+
+ Timestamp executeAtIfKnownElseTxnId()
+ {
+ return status == INVALID_OR_TRUNCATED ? this : executeAt;
+ }
}
- public static class TxnInfoWithMissing extends TxnInfo
+ public static class TxnInfoExtra extends TxnInfo
{
public final TxnId[] missing;
+ public final Ballot ballot;
- TxnInfoWithMissing(TxnId txnId, InternalStatus status, Timestamp
executeAt, TxnId[] missing)
+ TxnInfoExtra(TxnId txnId, InternalStatus status, Timestamp executeAt,
TxnId[] missing, Ballot ballot)
{
super(txnId, status, executeAt);
this.missing = missing;
+ this.ballot = ballot;
}
+ @Override
public TxnId[] missing()
{
return missing;
}
+ @Override
+ public Ballot ballot()
+ {
+ return ballot;
+ }
+
+ public TxnInfo update(TxnId[] newMissing)
+ {
+ if (newMissing == missing)
+ return this;
+
+ return newMissing == NO_TXNIDS && ballot == Ballot.ZERO
+ ? new TxnInfo(this, status, executeAt)
+ : new TxnInfoExtra(this, status, executeAt, newMissing,
ballot);
+ }
+
@Override
public String toString()
{
return "Info{" +
"txnId=" + toPlainString() +
", status=" + status +
- ", executeAt=" + (this == executeAt ? toPlainString() :
executeAt) +
+ ", executeAt=" + plainExecuteAt() +
+ (ballot != Ballot.ZERO ? ", ballot=" + ballot : "") +
", missing=" + Arrays.toString(missing) +
'}';
}
}
private final Key key;
private final RedundantBefore.Entry redundantBefore;
+ private final TxnInfo prunedBefore;
+ // TODO (desired): avoid loading if we know a transaction that should have
witnessed it has already been registered/applied
+ private final Object[] loadingPruned;
Review Comment:
Could this be renamed `loadingPrunedBtree` so the reason for its `Object[]`
type is apparent?
##########
accord-core/src/test/java/accord/burn/BurnTest.java:
##########
@@ -546,6 +546,7 @@ public static void main(String[] args)
public void testOne()
{
run(System.nanoTime());
+// run(99448562360375L);
Review Comment:
missed seed
##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -214,58 +214,64 @@ private void updateMaxConflicts(Command prev, Command
updated)
commandStore().updateMaxConflicts(prev, updated);
}
- private void updateCommandsForKey(Command prev, Command updated)
+ private void updateCommandsForKey(Command prev, Command next)
{
- if (!CommandsForKey.needsUpdate(prev, updated))
+ if (!CommandsForKey.needsUpdate(prev, next))
return;
- TxnId txnId = updated.txnId();
- Keys keys;
- if (txnId.domain().isKey() && txnId.kind().isGloballyVisible())
+ TxnId txnId = next.txnId();
+ if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this,
prev, next);
+ if (!CommandsForKey.managesExecution(txnId) &&
next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) &&
!prev.hasBeen(Status.Stable))
+ updateUnmanagedExecutionCommandsForKey(this, next);
+ }
+
+ private static void updateManagedCommandsForKey(SafeCommandStore
safeStore, Command prev, Command next)
+ {
+ TxnId txnId = next.txnId();
+ Keys keys = (Keys)next.keysOrRanges();
+ if (keys == null || next.hasBeen(Status.Truncated)) keys =
(Keys)prev.keysOrRanges();
+ if (keys == null)
+ return;
+
+ // TODO (required): additionalKeysOrRanges may not be being handled
entirely correctly here, though it may not matter.
+ // Once committed without a given key, we should be effectively
erasing the command from that CFK
+ PreLoadContext context = PreLoadContext.contextFor(txnId, keys,
COMMANDS);
+ // TODO (expected): execute immediately for any keys we already have
loaded, and save only those we haven't for later
Review Comment:
is that necessary? If we're going to start a second task to update some, why
don't we just use it to update all of them and skip going through the trouble
of splitting them up?
##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -214,58 +214,64 @@ private void updateMaxConflicts(Command prev, Command
updated)
commandStore().updateMaxConflicts(prev, updated);
}
- private void updateCommandsForKey(Command prev, Command updated)
+ private void updateCommandsForKey(Command prev, Command next)
{
- if (!CommandsForKey.needsUpdate(prev, updated))
+ if (!CommandsForKey.needsUpdate(prev, next))
return;
- TxnId txnId = updated.txnId();
- Keys keys;
- if (txnId.domain().isKey() && txnId.kind().isGloballyVisible())
+ TxnId txnId = next.txnId();
+ if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this,
prev, next);
+ if (!CommandsForKey.managesExecution(txnId) &&
next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) &&
!prev.hasBeen(Status.Stable))
+ updateUnmanagedExecutionCommandsForKey(this, next);
+ }
+
+ private static void updateManagedCommandsForKey(SafeCommandStore
safeStore, Command prev, Command next)
+ {
+ TxnId txnId = next.txnId();
+ Keys keys = (Keys)next.keysOrRanges();
+ if (keys == null || next.hasBeen(Status.Truncated)) keys =
(Keys)prev.keysOrRanges();
+ if (keys == null)
+ return;
+
+ // TODO (required): additionalKeysOrRanges may not be being handled
entirely correctly here, though it may not matter.
+ // Once committed without a given key, we should be effectively
erasing the command from that CFK
+ PreLoadContext context = PreLoadContext.contextFor(txnId, keys,
COMMANDS);
+ // TODO (expected): execute immediately for any keys we already have
loaded, and save only those we haven't for later
+ if (safeStore.canExecuteWith(context))
{
- keys = (Keys)updated.keysOrRanges();
- if (keys == null || updated.hasBeen(Status.Truncated)) keys =
(Keys)prev.keysOrRanges();
- if (keys == null)
- return;
-
- PreLoadContext context = PreLoadContext.contextFor(txnId, keys,
COMMANDS);
- // TODO (expected): execute immediately for any keys we already
have loaded, and save only those we haven't for later
- if (canExecuteWith(context))
- {
- for (Key key : keys)
- {
- get(key).update(this, prev, updated);
- }
- }
- else
+ for (Key key : keys)
{
- commandStore().execute(context, safeStore ->
safeStore.updateCommandsForKey(prev, updated))
- .begin(commandStore().agent);
+ safeStore.get(key).update(safeStore, next);
}
}
else
{
- if (!updated.hasBeen(Status.Stable) || prev.hasBeen(Status.Stable)
|| updated.hasBeen(Status.Truncated))
- return;
-
- keys = updated.asCommitted().waitingOn.keys;
- // TODO (required): consider how execution works for transactions
that await future deps and where the command store inherits additional keys in
execution epoch
- Ranges ranges = ranges().allAt(updated.executeAt());
- PreLoadContext context = PreLoadContext.contextFor(txnId, keys,
COMMANDS);
- // TODO (expected): execute immediately for any keys we already
have loaded, and save only those we haven't for later
- if (canExecuteWith(context))
- {
- Routables.foldl(keys, ranges, (self, t, key, o, i) -> {
- self.get(key).registerUnmanaged(self, self.get(t));
- return null;
- }, this, txnId, null, i->false);
- }
- else
- {
- commandStore().execute(context, safeStore ->
safeStore.updateCommandsForKey(prev, updated))
- .begin(commandStore().agent);
- }
+ safeStore = safeStore;
Review Comment:
redundant assignment
##########
accord-core/src/main/java/accord/local/CommandsForKey.java:
##########
@@ -111,32 +111,42 @@
* this replica's collection to decipher any fast path decision. Any other
replica must either do the same, or else
* will correctly record this transaction as present in any relevant deps of
later transactions.
*
- * TODO (expected): optimisations:
- * 3) consider storing a prefix of TxnId that are all NoInfo PreApplied
encoded as a BitStream as only required for computing missing collection
- * 4) consider storing (or caching) an int[] of records with an executeAt
that occurs out of order, sorted by executeAt
- *
* TODO (expected): maintain separate redundantBefore and closedBefore
timestamps, latter implied by any exclusivesyncpoint;
* advance former based on Applied status of all TxnId before
the latter
* TODO (expected): track whether a TxnId is a write on this key only for
execution (rather than globally)
* TODO (expected): merge with TimestampsForKey
- * TODO (expected): save bytes by encoding InternalStatus in TxnId.flags()
- * TODO (expected): migrate to BTree
+ * TODO (desired): save bytes by encoding InternalStatus in TxnId.flags()
* TODO (expected): remove a command that is committed to not intersect with
the key for this store (i.e. if accepted in a later epoch than committed on, so
ownership changes)
* TODO (expected): mark a command as notified once ready-to-execute or
applying
- * TODO (required): randomised testing
+ * TODO (required): more randomised testing
* TODO (required): linearizability violation detection
- * TODO (required): account for whether transactions should witness each other
for determining the missing collection
- * TODO (required): enforce that a transaction is not added with a key
dependency on a SyncPoint or ExclusiveSyncPoint;
- * also impose this invariant elsewhere, and create a
compile-time dependency between these code locations
*/
-public class CommandsForKey implements CommandsSummary
+public class CommandsForKey extends CommandsForKeyUpdate implements
CommandsSummary
{
- private static final boolean PRUNE_TRANSITIVE_DEPENDENCIES = true;
+ private static final Logger logger =
LoggerFactory.getLogger(CommandsForKey.class);
+ private static final boolean ELIDE_TRANSITIVE_DEPENDENCIES = true;
+
public static final RedundantBefore.Entry NO_REDUNDANT_BEFORE = new
RedundantBefore.Entry(null, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE,
TxnId.NONE, TxnId.NONE, null);
public static final TxnId[] NO_TXNIDS = new TxnId[0];
+ public static final TxnInfo NO_INFO = new TxnInfo(TxnId.NONE, HISTORICAL,
TxnId.NONE);
public static final TxnInfo[] NO_INFOS = new TxnInfo[0];
public static final Unmanaged[] NO_PENDING_UNMANAGED = new Unmanaged[0];
+ public static boolean manages(TxnId txnId)
+ {
+ return txnId.domain().isKey() && txnId.kind().isGloballyVisible();
+ }
+
+ public static boolean managesExecution(TxnId txnId)
Review Comment:
seems like the names for this method and `managesKeyExecution` should be
swapped?
##########
accord-core/src/main/java/accord/local/CommandsForKeyUpdate.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.Key;
+import accord.local.CommandsForKey.TxnInfo;
+import accord.primitives.Keys;
+import accord.primitives.Seekables;
+import accord.primitives.TxnId;
+
+import static
accord.local.CommandsForKey.InternalStatus.PREACCEPTED_OR_ACCEPTED_INVALIDATE;
+import static accord.local.KeyHistory.COMMANDS;
+import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
+
+public abstract class CommandsForKeyUpdate
Review Comment:
Does this need to be its own class? My reading of it is that it's kind of a
hardcoded command update listener, so why not just keep it as part of the
command/cfk update flow instead of adding a layer of indirection here? I see
that we plug in another notify sink in the CFK test. Is the test
instrumentation something that could be integrated in the the
InMemoryCommandStore and it's various Safe* classes and kept separate from the
production code?
##########
accord-core/src/main/java/accord/local/CommonAttributes.java:
##########
@@ -34,6 +34,7 @@ public interface CommonAttributes
Route<?> route();
PartialTxn partialTxn();
@Nullable Seekables<?, ?> additionalKeysOrRanges();
+ // TODO (expected): we don't need PartialDeps, only
Review Comment:
I think you missed the rest of the sentence for the TODO here
##########
accord-core/src/main/java/accord/local/CommandsForKeyUpdate.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.Key;
+import accord.local.CommandsForKey.TxnInfo;
+import accord.primitives.Keys;
+import accord.primitives.Seekables;
+import accord.primitives.TxnId;
+
+import static
accord.local.CommandsForKey.InternalStatus.PREACCEPTED_OR_ACCEPTED_INVALIDATE;
+import static accord.local.KeyHistory.COMMANDS;
+import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
+
+public abstract class CommandsForKeyUpdate
+{
+ @VisibleForTesting
+ public abstract CommandsForKey cfk();
+ abstract ExtraNotify notifier();
+ abstract void notify(SafeCommandStore safeStore, @Nullable CommandsForKey
prevCfk, @Nullable Command command, NotifySink notifySink);
+
+ void notify(SafeCommandStore safeStore, @Nullable CommandsForKey prevCfk,
@Nullable Command command)
Review Comment:
unused
##########
accord-core/src/main/java/accord/local/CommandsForKey.java:
##########
@@ -290,6 +302,75 @@ public static InternalStatus get(int ordinal)
}
}
+ static class LoadingPruned extends TxnId
Review Comment:
This could use a comment explaining what it does, and maybe a new name. I
was able to work out it's a reverse deps index, mapping a txnid to txnids that
have it as a dep. However I don't see the list updated in `pruneBefore` and I'm
not sure what the loading part is referring to.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]