kgusakov commented on code in PR #3633:
URL: https://github.com/apache/ignite-3/pull/3633#discussion_r1618608966


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1857,9 +1776,8 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
             boolean isRecovery
     ) {
         ClusterNode localMember = localNode();
-        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNode().name()));
 
-        boolean pendingAssignmentsAreForced = pendingAssignments.force();
+        boolean pendingAssignmentsAreForced = pendingAssignments != null && 
pendingAssignments.force();

Review Comment:
   Why null check is needed? Looks like it was working without it earlier. 
Anyway, pls make a comment about it, why it is really needed.



##########
modules/replicator/build.gradle:
##########
@@ -29,6 +29,9 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-raft')
     implementation project(':ignite-raft-api')
+    implementation project(':ignite-metastorage-api')

Review Comment:
   Does really needed?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -620,7 +830,15 @@ private CompletableFuture<Boolean> 
stopReplicaInternal(ReplicationGroupId replic
             }
         });
 
-        return isRemovedFuture;
+        return isRemovedFuture
+                .thenApply(v -> {
+                    try {
+                        raftManager.stopRaftNodes(replicaGrpId);

Review Comment:
   Shouldn't it be a part of concrete Replica stop?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1936,6 +1854,7 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
                     ? pendingAssignmentsNodes
                     : union(pendingAssignmentsNodes, stableAssignments);
 
+            // var isReplicaWasStarted = replicaMgr.isReplicaStarted(new 
TablePartitionId(tableId, partitionId));

Review Comment:
   Pls remove



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1802,6 +1718,9 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
 
         Assignments pendingAssignments = 
Assignments.fromBytes(pendingAssignmentsEntry.value());
 
+        // boolean isLocalNodeAssigned = union(stableAssignments, 
pendingAssignments.nodes()).stream()

Review Comment:
   Looks like a debug artifact



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -922,18 +885,30 @@ private CompletableFuture<Void> 
startPartitionAndStartClient(
                 storageUpdateConfig
         );
 
-        Peer serverPeer = realConfiguration.peer(localNode().name());
-
-        var raftNodeId = localMemberAssignment == null ? null : new 
RaftNodeId(replicaGrpId, serverPeer);
-
-        boolean shouldStartRaftListeners = localMemberAssignment != null && 
!((Loza) raftMgr).isStarted(raftNodeId);
+        boolean shouldStartRaftListeners = 
shouldStartRaftListeners(assignments, nonStableNodeAssignments);
 
         if (shouldStartRaftListeners) {
             ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
 
             mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
         }
 
+        Supplier<RaftGroupService> getCachedRaftClient = () -> {

Review Comment:
   Will be removed after IGNITE-22315, right? Could you make a TODO here too, 
pls.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -548,6 +664,100 @@ private CompletableFuture<Replica> startReplicaInternal(
                 .thenCompose(v -> replicaFuture);
     }
 
+    /**
+     * TODO: will be private after 
https://issues.apache.org/jira/browse/IGNITE-22315
+     * Temporary public method for RAFT-client starting.
+     *
+     * @param replicaGrpId Replication Group ID.
+     * @param newConfiguration Peers and learners nodes for a raft group.
+     * @param raftClientCache Temporal supplier that returns RAFT-client from 
TableRaftService if it's already exists and was put into the

Review Comment:
   Pls, add TODO with ticket for removal



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -466,75 +490,167 @@ private void stopLeaseProlongation(ReplicationGroupId 
groupId, @Nullable String
         });
     }
 
+    private CompletableFuture<Boolean> startReplicaInternal(
+            MetaStorageManager metaStorageMgr,
+            RaftGroupListener raftGroupListener,
+            MvTableStorage mvTableStorage,
+            SnapshotStorageFactory snapshotStorageFactory,
+            Consumer<RaftGroupService> updateTableRaftService,
+            Function<RaftGroupService, ReplicaListener> createListener,
+            int zoneId,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            TablePartitionId replicaGrpId,
+            PeersAndLearners newConfiguration
+    ) throws NodeStoppingException {
+        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
+
+        RaftGroupOptions groupOptions = groupOptionsForPartition(
+                mvTableStorage,
+                snapshotStorageFactory);
+
+        RaftGroupEventsListener raftGroupEventsListener = 
createRaftGroupEventsListener(metaStorageMgr, zoneId,
+                replicaGrpId);
+
+        // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
+        CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut = 
((Loza) raftManager).startRaftGroupNode(
+                raftNodeId,
+                newConfiguration,
+                raftGroupListener,
+                raftGroupEventsListener,
+                groupOptions,
+                raftGroupServiceFactory
+        );
+
+        return startReplica(
+                replicaGrpId,
+                newConfiguration,
+                updateTableRaftService,
+                createListener, storageIndexTracker,
+                newRaftClientFut);
+
+    }
+
     /**
-     * Starts a replica. If a replica with the same partition id already 
exists, the method throws an exception.
+     * Creates and starts a new replica.
      *
+     * @param metaStorageMgr Metastore manager.
+     * @param raftGroupListener Raft group listener for raft group starting.
+     * @param mvTableStorage Multi-version table storage.
+     * @param snapshotStorageFactory Snapshot storage factory for raft group 
option's parameterization.
+     * @param updateTableRaftService Temporal consumer while TableRaftService 
wouldn't be removed in
+     *      TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
+     * @param createListener Due to creation of ReplicaListener in 
TableManager, the function returns desired listener by created
+     *      raft-client inside {@link #startReplica} method.
+     * @param zoneId Distribution zone ID.
      * @param replicaGrpId Replication group id.
-     * @param listener Replica listener.
-     * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
-     * @throws NodeStoppingException If node is stopping.
-     * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with 
the same replication group id has already been
-     *         started.
+     * @param newConfiguration A configuration for new raft group.
+     * @return Future that promises ready new replica when done.
      */
-    public CompletableFuture<Replica> startReplica(
-            ReplicationGroupId replicaGrpId,
-            ReplicaListener listener,
-            TopologyAwareRaftGroupService raftClient,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker
+    public CompletableFuture<Boolean> startReplica(
+            MetaStorageManager metaStorageMgr,
+            RaftGroupListener raftGroupListener,
+            MvTableStorage mvTableStorage,
+            SnapshotStorageFactory snapshotStorageFactory,
+            Consumer<RaftGroupService> updateTableRaftService,
+            Function<RaftGroupService, ReplicaListener> createListener,
+            int zoneId,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            TablePartitionId replicaGrpId,
+            PeersAndLearners newConfiguration
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return startReplicaInternal(replicaGrpId, listener, raftClient, 
storageIndexTracker);
+            return startReplicaInternal(
+                    metaStorageMgr,
+                    raftGroupListener,
+                    mvTableStorage,
+                    snapshotStorageFactory,
+                    updateTableRaftService,
+                    createListener,
+                    zoneId,
+                    storageIndexTracker,
+                    replicaGrpId,
+                    newConfiguration);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     /**
-     * Internal method for starting a replica.
+     * Starts a raft-client and pass it to a replica creation if the replica 
should be started too. If a replica with the same partition id
+     * already exists, the method throws an exception.
      *
      * @param replicaGrpId Replication group id.

Review Comment:
   Javadoc params is not consistent with real aruments



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java:
##########
@@ -18,11 +18,12 @@
 package org.apache.ignite.internal.replicator.listener;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 
 /** Replica listener. */
-@FunctionalInterface
+// @FunctionalInterface

Review Comment:
   Pls remove



##########
modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java:
##########
@@ -477,34 +499,45 @@ private CompletableFuture<TopologyAwareRaftGroupService> 
createReplicationGroup(
 
             var rftNodeId = new RaftNodeId(groupId, peer);
 
+            PeersAndLearners newConfiguration = fromConsistentIds(nodes);
+
             CompletableFuture<TopologyAwareRaftGroupService> raftClientFut = 
raftManager.startRaftGroupNode(
                     rftNodeId,
-                    fromConsistentIds(nodes),
+                    newConfiguration,
                     new TestRaftGroupListener(),
                     RaftGroupEventsListener.noopLsnr,
                     RaftGroupOptions.defaults(),
                     raftClientFactory.get(nodeName)
             );
             serviceFutures.add(raftClientFut);
 
-            CompletableFuture<Replica> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
+            CompletableFuture<Boolean> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
                 try {
+                    ReplicaListener listener = new ReplicaListener() {
+                        @Override
+                        public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, String senderId) {
+                            log.info("Handle request [type={}]", 
request.getClass().getSimpleName());
+
+                            return raftClient
+                                    
.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
+                                    .thenCompose(ignored -> replicaListener == 
null
+                                            ? completedFuture(new 
ReplicaResult(null, null))
+                                            : replicaListener.apply(request, 
senderId));
+                        }
+
+                        @Override
+                        public RaftCommandRunner raftClient() {
+                            return raftClient;
+                        }
+                    };
+
                     return replicaManager.startReplica(
                             groupId,
-                            (request, senderId) -> {
-                                log.info("Handle request [type={}]", 
request.getClass().getSimpleName());
-
-                                return 
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
-                                        .thenCompose(ignored -> {
-                                            if (replicaListener == null) {
-                                                return completedFuture(new 
ReplicaResult(null, null));
-                                            } else {
-                                                return 
replicaListener.apply(request, senderId);
-                                            }
-                                        });
-                            },
-                            raftClient,
-                            new 
PendingComparableValuesTracker<>(Long.MAX_VALUE));
+                            newConfiguration,
+                            (unused) -> { },
+                            (unused) -> listener, // TODO

Review Comment:
   TODO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to