tkalkirill commented on code in PR #2581:
URL: https://github.com/apache/ignite-3/pull/2581#discussion_r1323995164


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2202,7 +2220,11 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 try {
                     assert evt.single();
 
-                    return 
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
+                    return recoveryFuture.thenComposeAsync(v -> {

Review Comment:
   Maybe it will be enough for us to wait for the `recoveryFuture` once during 
recovery and not use it again?



##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -1175,4 +1177,22 @@ public static <T> List<T> bytesToList(ByteBuffer buf, 
Function<ByteBuffer, T> tr
 
         return result;
     }
+
+    /**
+     * Creates a consumer that, when passed to a {@link 
CompletableFuture#whenComplete} call, will copy the outcome (either successful 
or
+     * not) of the target future to the given future.
+     *
+     * @param future Future to copy the outcome to.
+     * @param <T> Future result type.
+     * @return Consumer for transferring a future outcome to another future.
+     */
+    public static <T> BiConsumer<T, Throwable> copyStateTo(CompletableFuture<? 
super T> future) {

Review Comment:
   There aren't enough tests.



##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -223,13 +225,7 @@ private void close(@Nullable Throwable throwable) {
 
             var futs = pendingRequests.values().toArray(new 
CompletableFuture[0]);
 
-            CompletableFuture.allOf(futs).whenComplete((res, err) -> {
-                if (err != null) {
-                    completionFut.completeExceptionally(err);
-                } else {
-                    completionFut.complete(null);
-                }
-            });
+            
CompletableFuture.allOf(futs).whenComplete(copyStateTo(completionFut));

Review Comment:
   Can use static import for `CompletableFuture#allOf`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -589,10 +588,29 @@ private CompletableFuture<Void> 
performRebalanceOnRecovery(long revision) {
 
         try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
revision)) {
             CompletableFuture<?>[] futures = cursor.stream()
-                    .map(this::handleChangePendingAssignmentEvent)
+                    .map(pendingAssignmentEntry -> {
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info(
+                                    "Missed pending assignments for key '{}' 
with revision {} discovered, performing recovery",
+                                    new String(pendingAssignmentEntry.key(), 
UTF_8),
+                                    revision
+                            );
+                        }
+
+                        // We use the Meta Storage recovery revision here 
instead of the entry revision, because
+                        // 'handleChangePendingAssignmentEvent' accesses some 
Versioned Values that only store values starting with
+                        // tokens equal to Meta Storage recovery revision. In 
other words, if the entry has a lower revision than the
+                        // recovery revision, there will never be a Versioned 
Value corresponding to its revision.
+                        return 
handleChangePendingAssignmentEvent(pendingAssignmentEntry, revision);

Review Comment:
   Perhaps I'm missing something, but won't the `pendingAssignmentEntry` have 
the same revision as the `metaStorageMgr.recoveryFinishedFuture()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to