dcapwell commented on code in PR #28:
URL: https://github.com/apache/cassandra-accord/pull/28#discussion_r1131815232
##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -185,46 +186,47 @@ private static Collection<Node.Id> allNodesFor(Txn txn,
Topology... topologies)
return result;
}
- private static Stream<MessageTask> syncEpochCommands(Node node, long
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients,
long trgEpoch, boolean committedOnly) throws ExecutionException
+ private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>>
recipients, long trgEpoch, boolean committedOnly)
{
Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
Consumer<Command> commandConsumer = command ->
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command),
CheckStatusOk::merge);
+ AsyncChain<Void> start;
if (committedOnly)
- getUninterruptibly(node.commandStores().forEach(commandStore ->
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges,
srcEpoch, commandConsumer)));
+ start = node.commandStores().forEach(commandStore ->
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges,
srcEpoch, commandConsumer));
else
- getUninterruptibly(node.commandStores().forEach(commandStore ->
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch,
commandConsumer)));
+ start = node.commandStores().forEach(commandStore ->
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch,
commandConsumer));
- return syncMessages.entrySet().stream().map(e -> {
+ return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
CommandSync sync = new CommandSync(e.getKey(), e.getValue(),
srcEpoch, trgEpoch);
return MessageTask.of(node, recipients.apply(sync),
sync.toString(), sync::process);
- });
+ }));
}
private static final boolean PREACCEPTED = false;
private static final boolean COMMITTED_ONLY = true;
- /**
- * Syncs all replicated commands. Overkill, but useful for confirming
issues in optimizedSync
- */
- private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch)
throws ExecutionException
Review Comment:
this was dead code, glad to remove or make async, so left it commented out
just for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]