dcapwell commented on code in PR #30:
URL: https://github.com/apache/cassandra-accord/pull/30#discussion_r1136311266


##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -185,46 +186,47 @@ private static Collection<Node.Id> allNodesFor(Txn txn, 
Topology... topologies)
         return result;
     }
 
-    private static Stream<MessageTask> syncEpochCommands(Node node, long 
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, 
long trgEpoch, boolean committedOnly) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node 
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> 
recipients, long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> 
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), 
CheckStatusOk::merge);
+        AsyncChain<Void> start;
         if (committedOnly)
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, 
srcEpoch, commandConsumer));
         else
-            getUninterruptibly(node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> 
InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, 
commandConsumer));
 
-        return syncMessages.entrySet().stream().map(e -> {
+        return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), 
srcEpoch, trgEpoch);
             return MessageTask.of(node, recipients.apply(sync), 
sync.toString(), sync::process);
-        });
+        }));
     }
 
     private static final boolean PREACCEPTED = false;
     private static final boolean COMMITTED_ONLY = true;
 
-    /**
-     * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
-     */
-    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) 
throws ExecutionException
-    {
-        Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
-        Topology localTopology = syncTopology.forNode(node.id());
-        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
-
-        Ranges ranges = localTopology.ranges();
-        Stream<MessageTask> messageStream = Stream.empty();
-        for (long epoch=1; epoch<=syncEpoch; epoch++)
-        {
-            messageStream = Stream.concat(messageStream, 
syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
-        }
-        return messageStream;
-    }
+//    /**
+//     * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
+//     */
+//    private static Stream<MessageTask> thoroughSync(Node node, long 
syncEpoch) throws ExecutionException
+//    {
+//        Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
+//        Topology localTopology = syncTopology.forNode(node.id());
+//        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
+//
+//        Ranges ranges = localTopology.ranges();
+//        Stream<MessageTask> messageStream = Stream.empty();
+//        for (long epoch=1; epoch<=syncEpoch; epoch++)
+//        {
+//            messageStream = Stream.concat(messageStream, 
syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
+//        }
+//        return messageStream;
+//    }

Review Comment:
   in this context the maintain was super simple, so cost is low.  Now does 
this function "work" is unknown...  I don't mind keeping it now that it compiles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to