bdeggleston commented on code in PR #30:
URL: https://github.com/apache/cassandra-accord/pull/30#discussion_r1136309845


##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -185,46 +186,47 @@ private static Collection<Node.Id> allNodesFor(Txn txn, 
Topology... topologies)
         return result;
     }
 
-    private static Stream<MessageTask> syncEpochCommands(Node node, long 
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, 
long trgEpoch, boolean committedOnly) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node 
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> 
recipients, long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> 
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), 
CheckStatusOk::merge);
+        AsyncChain<Void> start;
         if (committedOnly)
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer));
         else
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer));
 
-        return syncMessages.entrySet().stream().map(e -> {
+        return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), 
srcEpoch, trgEpoch);
             return MessageTask.of(node, recipients.apply(sync), 
sync.toString(), sync::process);
-        });
+        }));
     }
 
     private static final boolean PREACCEPTED = false;
     private static final boolean COMMITTED_ONLY = true;
 
-    /**
-     * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
-     */
-    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) 
throws ExecutionException
-    {
-        Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
-        Topology localTopology = syncTopology.forNode(node.id());
-        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
-
-        Ranges ranges = localTopology.ranges();
-        Stream<MessageTask> messageStream = Stream.empty();
-        for (long epoch=1; epoch<=syncEpoch; epoch++)
-        {
-            messageStream = Stream.concat(messageStream, 
syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
-        }
-        return messageStream;
-    }
+//    /**
+//     * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
+//     */
+//    private static Stream<MessageTask> thoroughSync(Node node, long 
syncEpoch) throws ExecutionException
+//    {
+//        Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
+//        Topology localTopology = syncTopology.forNode(node.id());
+//        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
+//
+//        Ranges ranges = localTopology.ranges();
+//        Stream<MessageTask> messageStream = Stream.empty();
+//        for (long epoch=1; epoch<=syncEpoch; epoch++)
+//        {
+//            messageStream = Stream.concat(messageStream, 
syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
+//        }
+//        return messageStream;
+//    }

Review Comment:
   Got it, yeah I didn't see the comment. I'll be a little pedantic and say 
that although the code is unreachable, it does provide value to the project as 
a tool when debugging some burn test failures, and is therefore not "dead". 
However, it also isn't run and is therefore quickly stops working if not being 
used diagnosing failures. Having said that, I'm not sure if the value it 
provides is still more than the cost to maintain it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to