ifesdjeen commented on code in PR #251:
URL: https://github.com/apache/cassandra-accord/pull/251#discussion_r2360348432
##########
accord-core/src/main/java/accord/local/cfk/Serialize.java:
##########
@@ -326,7 +326,7 @@ private static ByteBuffer
unsafeToBytesWithoutKey(CommandsForKey cfk)
| (executeAtCount > 0 ?
HAS_EXECUTE_AT_HEADER_BIT : 0)
| (ballotCount > 0 ?
HAS_BALLOT_HEADER_BIT : 0)
| (overrideCount > 0 ?
HAS_STATUS_OVERRIDES_HEADER_BIT : 0)
- | (cfk.bootstrappedAt() != null ?
HAS_BOOTSTRAPPED_AT_HEADER_BIT : 0)
+ | (cfk.readyAt() != null ?
HAS_BOOTSTRAPPED_AT_HEADER_BIT : 0)
Review Comment:
nit: nice formatting is now broken)
##########
accord-core/src/main/java/accord/local/RedundantStatus.java:
##########
@@ -151,15 +155,15 @@ public enum Property
}
final boolean overrideWasOwned;
- final boolean mergeWithPreBootstrapOrStale;
+ final boolean mergeWithUnready;
Review Comment:
This is the closest line to 148. We have a comment there saying "we have 32
integer bits to use"; but we now have 64 right?
Also, do we want to relax this invariant then?
##########
accord-core/src/main/java/accord/api/Journal.java:
##########
@@ -67,7 +67,12 @@ enum Load
void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
void purge(CommandStores commandStores, EpochSupplier minEpoch);
- void replay(CommandStores commandStores);
+
+ /**
+ * Repays all messages from journal to rehydrate CommandStores state.
Returns whether it has seen (and ignored)
Review Comment:
nit: repay
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -892,28 +1057,28 @@ public final DataStore unsafeGetDataStore()
final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp
at, Ranges ranges)
{
- execute((PreLoadContext.Empty) () -> "Mark Safe To Read", safeStore ->
{
+ execute((Empty) () -> "Mark Safe To Read", safeStore -> {
// TODO (required): handle weird edge cases like newer at having a
lower HLC than prior existing at, but higher epoch
Ranges validatedSafeToRead =
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
safeStore.setSafeToRead(purgeAndInsert(safeToRead, at,
validatedSafeToRead));
updateMaxConflicts(ranges, at);
}, agent);
}
- public static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, Ranges
ranges, NavigableMap<TxnId, Ranges> bootstrappedAt)
+ public static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, Ranges
ranges, NavigableMap<TxnId, Ranges> readyAt)
{
Invariants.requireArgument(!ranges.isEmpty());
if (at == TxnId.NONE)
{
- for (Ranges rs : bootstrappedAt.values())
+ for (Ranges rs : readyAt.values())
Invariants.require(!ranges.intersects(rs));
}
else
{
- bootstrappedAt.floorEntry(at).getValue().containsAll(ranges);
+ readyAt.floorEntry(at).getValue().containsAll(ranges);
Review Comment:
I understand this patch only has renamed the field, but this probably should
be an assert
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -1053,6 +950,57 @@ public synchronized void
initializeTopologyUnsafe(Journal.TopologyUpdate update)
loadSnapshot(new Snapshot(shards,
update.global.forNode(supplier.node.id()).trim(), update.global));
}
+ public synchronized void resetTopology(Journal.TopologyUpdate update)
+ {
+ // TODO: assert
+ Snapshot current = this.current;
+ Invariants.require(update.global.epoch() == current.local.epoch());
+ ShardHolder[] shards = new ShardHolder[current.commandStores.size()];
+ int i = 0;
Review Comment:
nit: unused
##########
accord-core/src/main/java/accord/coordinate/ExecuteTxn.java:
##########
@@ -190,7 +186,7 @@ protected void startOnceInitialised()
Node.Id self = node.id();
if (permitLocalExecution() && tryIfUniversal(self))
{
- isPrivilegedVoteCommitting = true;
+ isPrivilegedVoteCommitting = txnId.hasPrivilegedCoordinator() &&
path == FAST;
Review Comment:
Just to make sure: previously we would not take `nack` into account, but now
we would because it can assign `isPrivilegedVoteCommitting`. Is this intended?
Probably yes but I am missing context to confirm.
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -673,15 +808,45 @@ protected void updatedRedundantBefore(SafeCommandStore
safeStore, RedundantBefor
listeners.clearBefore(this, clearWaitingBefore);
}
- protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges)
+ public AsyncResult<Void> awaitVisibility(long epoch, Ranges ranges)
+ {
+ synchronized (waitingOnVisibility)
+ {
+ if (waitingOnVisibility.isEmpty())
+ return AsyncResults.success(null);
+
+ List<AsyncResult<Void>> awaiting = new ArrayList<>();
+ Ranges waitingOn = Ranges.EMPTY;
+ for (Map.Entry<Long, WaitingOnVisibility> e :
waitingOnVisibility.entrySet())
+ {
+ if (e.getKey() > epoch)
+ break;
+
+ Ranges remaining = e.getValue().waitingOn;
+ Ranges intersecting = remaining.slice(ranges, Minimal);
+ if (!intersecting.isEmpty())
+ {
+ awaiting.add(e.getValue().whenDone);
+ ranges = ranges.without(intersecting);
+ waitingOn = waitingOn.with(intersecting);
Review Comment:
Do we need to accumulate it here if we never use it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]