JAkutenshi commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1565873049
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -596,10 +611,18 @@ public class IgniteImpl implements Ignite {
logicalTopologyService,
raftMgr,
topologyAwareRaftGroupServiceFactory,
- clockService
- );
+ clockService,
+ tablePartId -> {
+ CatalogTableDescriptor tbl =
catalogManager.table(tablePartId.tableId(),
catalogManager.latestCatalogVersion());
- ReplicationConfiguration replicationConfig =
clusterConfigRegistry.getConfiguration(ReplicationConfiguration.KEY);
+ int zoneId = tbl == null ? 2 : tbl.zoneId();
Review Comment:
Then, may be to introduce `DistributedZone.DEFAULT_ZONE_ID`?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
}
try {
- CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
- replicaFut
- .thenCompose(replica ->
replica.processPlacementDriverMessage(msg))
- .whenComplete((response, ex) -> {
- if (ex == null) {
-
clusterNetSvc.messagingService().respond(senderConsistentId, response,
correlationId);
- } else if (!(unwrapCause(ex) instanceof
NodeStoppingException)) {
- LOG.error("Failed to process placement driver
message [msg={}].", ex, msg);
- }
- });
+ Set<ReplicationGroupId> replicationGroupIds =
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+ CompletableFuture<LeaseGrantedMessageResponse>[] futures = new
CompletableFuture[replicationGroupIds.size()];
+
+ int i = 0;
+
+ for (ReplicationGroupId grpId : replicationGroupIds) {
+ CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+ futures[i++] = replicaFut.thenCompose(replica ->
replica.processPlacementDriverMessage(msg));
+ }
Review Comment:
Missed in the past, but the way to loop this looks strange. The suggestion
below may be a little less performant, but more readable, I may guess.
```suggestion
CompletableFuture<LeaseGrantedMessageResponse>[] futures =
(CompletableFuture<LeaseGrantedMessageResponse>[])
replicationGroupIds
.stream()
.map(grpId -> replicas
.computeIfAbsent(grpId, k -> new
CompletableFuture<>())
.thenCompose(replica ->
replica.processPlacementDriverMessage(msg)))
.toArray();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]