vldpyatkov commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1565902483
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
}
try {
- CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
- replicaFut
- .thenCompose(replica ->
replica.processPlacementDriverMessage(msg))
- .whenComplete((response, ex) -> {
- if (ex == null) {
-
clusterNetSvc.messagingService().respond(senderConsistentId, response,
correlationId);
- } else if (!(unwrapCause(ex) instanceof
NodeStoppingException)) {
- LOG.error("Failed to process placement driver
message [msg={}].", ex, msg);
- }
- });
+ Set<ReplicationGroupId> replicationGroupIds =
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+ CompletableFuture<LeaseGrantedMessageResponse>[] futures = new
CompletableFuture[replicationGroupIds.size()];
+
+ int i = 0;
+
+ for (ReplicationGroupId grpId : replicationGroupIds) {
+ CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+ futures[i++] = replicaFut.thenCompose(replica ->
replica.processPlacementDriverMessage(msg));
+ }
Review Comment:
Here we are at the point where the simple loop is less clear than the
streaming API...
I am not sure that the code needs to be changed, but if anyone else thinks
that, I am ready to apply the proposal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]