ibessonov commented on code in PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562#discussion_r1084876619


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage 
partitionSnapshotStorage,
     public void start() {
         Executor executor = 
partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, 
tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", 
createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> 
prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = 
getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, 
tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not 
found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender 
not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> 
loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> 
updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> 
completesRebalance(throwable))

Review Comment:
   Why "completes" instead of "complete"?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage 
partitionSnapshotStorage,
     public void start() {
         Executor executor = 
partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, 
tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", 
createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> 
prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = 
getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, 
tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not 
found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender 
not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> 
loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> 
updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> 
completesRebalance(throwable))
+                .thenCompose(Function.identity())
+                .whenComplete((unused, throwable) -> {
+                    if (throwable == null) {
+                        joinFuture.complete(null);

Review Comment:
   Why do you need a second object? This is not explained and not obvious at all



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -157,9 +162,9 @@ public void join() throws InterruptedException {
     public void cancel() {
         canceled = true;
 
-        LOG.info("Copier is canceled for partition [partId={}, tableId={}]", 
partId(), tableId());
+        LOG.info("Copier is canceled for partition [{}]", 
createPartitionInfo());
 
-        CompletableFuture<?> fut = future;
+        CompletableFuture<?> fut = rebalanceFuture;

Review Comment:
   Does it work? Do we have a test that would show that "cancel" works when 
there are no messages received by the node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to