tkalkirill commented on code in PR #2581:
URL: https://github.com/apache/ignite-3/pull/2581#discussion_r1323995164
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2202,7 +2220,11 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
try {
assert evt.single();
- return
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
+ return recoveryFuture.thenComposeAsync(v -> {
Review Comment:
Maybe it will be enough for us to wait for the `recoveryFuture` once during
recovery and not use it again?
##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -1175,4 +1177,22 @@ public static <T> List<T> bytesToList(ByteBuffer buf,
Function<ByteBuffer, T> tr
return result;
}
+
+ /**
+ * Creates a consumer that, when passed to a {@link
CompletableFuture#whenComplete} call, will copy the outcome (either successful
or
+ * not) of the target future to the given future.
+ *
+ * @param future Future to copy the outcome to.
+ * @param <T> Future result type.
+ * @return Consumer for transferring a future outcome to another future.
+ */
+ public static <T> BiConsumer<T, Throwable> copyStateTo(CompletableFuture<?
super T> future) {
Review Comment:
There aren't enough tests.
##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -223,13 +225,7 @@ private void close(@Nullable Throwable throwable) {
var futs = pendingRequests.values().toArray(new
CompletableFuture[0]);
- CompletableFuture.allOf(futs).whenComplete((res, err) -> {
- if (err != null) {
- completionFut.completeExceptionally(err);
- } else {
- completionFut.complete(null);
- }
- });
+
CompletableFuture.allOf(futs).whenComplete(copyStateTo(completionFut));
Review Comment:
Can use static import for `CompletableFuture#allOf`?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -589,10 +588,29 @@ private CompletableFuture<Void>
performRebalanceOnRecovery(long revision) {
try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix,
revision)) {
CompletableFuture<?>[] futures = cursor.stream()
- .map(this::handleChangePendingAssignmentEvent)
+ .map(pendingAssignmentEntry -> {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Missed pending assignments for key '{}'
with revision {} discovered, performing recovery",
+ new String(pendingAssignmentEntry.key(),
UTF_8),
+ revision
+ );
+ }
+
+ // We use the Meta Storage recovery revision here
instead of the entry revision, because
+ // 'handleChangePendingAssignmentEvent' accesses some
Versioned Values that only store values starting with
+ // tokens equal to Meta Storage recovery revision. In
other words, if the entry has a lower revision than the
+ // recovery revision, there will never be a Versioned
Value corresponding to its revision.
+ return
handleChangePendingAssignmentEvent(pendingAssignmentEntry, revision);
Review Comment:
Perhaps I'm missing something, but won't the `pendingAssignmentEntry` have
the same revision as the `metaStorageMgr.recoveryFinishedFuture()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]