sanpwc commented on code in PR #1369:
URL: https://github.com/apache/ignite-3/pull/1369#discussion_r1034808549


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -330,6 +361,21 @@ private void sendReplicaUnavailableErrorResponse(
         }
     }
 
+    /**
+     * Sends await replica response.
+     */
+    private void sendAwaitReplicaResponse(
+            ClusterNode sender,

Review Comment:
   two params should be in one line.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -118,17 +120,42 @@ public ReplicaManager(
 
                 ReplicaRequest request = (ReplicaRequest) message;
 
-                HybridTimestamp requestTimestamp = extractTimestamp(request);
+                if (request instanceof AwaitReplicaRequest) {
+                    replicas.compute(request.groupId(), (replicationGroupId, 
replicaFut) -> {
+                        if (replicaFut == null) {
+                            replicaFut = new CompletableFuture<>();
+                        }
+
+                        if (!replicaFut.isDone()) {
+                            replicaFut.thenCompose(ignore -> {
+                                IgniteUtils.inBusyLock(busyLock,

Review Comment:
   Formatting.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -118,17 +120,42 @@ public ReplicaManager(
 
                 ReplicaRequest request = (ReplicaRequest) message;
 
-                HybridTimestamp requestTimestamp = extractTimestamp(request);
+                if (request instanceof AwaitReplicaRequest) {
+                    replicas.compute(request.groupId(), (replicationGroupId, 
replicaFut) -> {
+                        if (replicaFut == null) {
+                            replicaFut = new CompletableFuture<>();
+                        }
+
+                        if (!replicaFut.isDone()) {
+                            replicaFut.thenCompose(ignore -> {
+                                IgniteUtils.inBusyLock(busyLock,
+                                        () -> sendAwaitReplicaResponse(sender, 
correlationId));
+
+                                return null;
+                            });
+
+                            return replicaFut;
+                        } else {
+                            IgniteUtils.inBusyLock(busyLock, () -> 
sendAwaitReplicaResponse(sender, correlationId));
+
+                            return replicaFut;
+                        }
+                    });
 
-                Replica replica = replicas.get(request.groupId());
+                    return;
+                }
+
+                CompletableFuture<Replica> replicaFut = 
replicas.get(request.groupId());
+
+                HybridTimestamp requestTimestamp = extractTimestamp(request);
 
-                if (replica == null) {
+                if (replicaFut == null || !replicaFut.isDone()) {
                     sendReplicaUnavailableErrorResponse(sender, correlationId, 
request, requestTimestamp);
 
                     return;
                 }
 
-                CompletableFuture<Object> result = 
replica.processRequest(request);
+                CompletableFuture<Object> result = 
replicaFut.join().processRequest(request);

Review Comment:
   Please add a comment that replicaFut is always completed here.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -118,17 +120,42 @@ public ReplicaManager(
 
                 ReplicaRequest request = (ReplicaRequest) message;
 
-                HybridTimestamp requestTimestamp = extractTimestamp(request);
+                if (request instanceof AwaitReplicaRequest) {

Review Comment:
   Please add some comments explaining the code.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -164,7 +191,7 @@ public Replica replica(ReplicationGroupId replicaGrpId) 
throws NodeStoppingExcep
         }
 
         try {
-            return replicas.get(replicaGrpId);
+            return replicas.get(replicaGrpId).join();

Review Comment:
   Seems that replica(ReplicationGroupId replicaGrpId) method is never used. 
Let's just remove it.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -205,15 +232,19 @@ private Replica startReplicaInternal(
             ReplicationGroupId replicaGrpId,
             ReplicaListener listener
     ) {
-        var replica = new Replica(replicaGrpId, listener);
+        replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
+            if (replicaFut == null) {
+                replicaFut = CompletableFuture.completedFuture(new 
Replica(replicaGrpId, listener));
 
-        Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
+                return replicaFut;
+            } else {
+                replicaFut.complete(new Replica(replicaGrpId, listener));
 
-        if (previous != null) {
-            throw new ReplicaIsAlreadyStartedException(replicaGrpId);
-        }
+                return replicaFut;
+            }
+        });
 
-        return replica;
+        return replicas.get(replicaGrpId).join();

Review Comment:
   Let's add a comment that replicaFut is always completed here.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -372,10 +418,10 @@ private NetworkMessage 
prepareReplicaErrorResponse(HybridTimestamp requestTimest
     private void idleSafeTimeSync() {
         replicas.values().forEach(r -> {
             ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(r.groupId())
+                    .groupId(r.join().groupId())

Review Comment:
   Let's skip uncompleted replicas there's no sense in sending 
safeTimeSyncRequests to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to