dcapwell commented on code in PR #28:
URL: https://github.com/apache/cassandra-accord/pull/28#discussion_r1131815232


##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -185,46 +186,47 @@ private static Collection<Node.Id> allNodesFor(Txn txn, 
Topology... topologies)
         return result;
     }
 
-    private static Stream<MessageTask> syncEpochCommands(Node node, long 
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, 
long trgEpoch, boolean committedOnly) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node 
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> 
recipients, long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> 
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), 
CheckStatusOk::merge);
+        AsyncChain<Void> start;
         if (committedOnly)
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer));
         else
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer));
 
-        return syncMessages.entrySet().stream().map(e -> {
+        return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), 
srcEpoch, trgEpoch);
             return MessageTask.of(node, recipients.apply(sync), 
sync.toString(), sync::process);
-        });
+        }));
     }
 
     private static final boolean PREACCEPTED = false;
     private static final boolean COMMITTED_ONLY = true;
 
-    /**
-     * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
-     */
-    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) 
throws ExecutionException

Review Comment:
   this was dead code, glad to remove or make async, so left it commented out 
just for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to