sashapolo commented on code in PR #1807: URL: https://github.com/apache/ignite-3/pull/1807#discussion_r1144399108
########## modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.causality; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.jetbrains.annotations.Nullable; + +/** + * Versioned Value flavor that accumulates updates posted by the {@link #update} method and "commits" them when {@link #complete} or + * {@link #completeExceptionally} are called or a storage revision is updated (see constructors for details). + */ +public class IncrementalVersionedValue<T> extends AbstractVersionedValue<T> { + /** Update mutex. */ + private final Object updateMutex = new Object(); + + /** + * Token that was used with the most recent {@link #update} call. + * + * <p>Multi-threaded access is guarded by {@link #updateMutex}. + */ + private long expectedToken = -1; + + /** + * Token that was used with the most recent {@link #completeInternal} call. + * + * <p>Multi-threaded access is guarded by {@link #updateMutex}. + */ + private long lastCompleteToken = -1; + + /** + * Future that will be completed after all updates over the value in context of current causality token will be performed. This + * {@code updaterFuture} is {@code null} if no updates in context of current causality token have been initiated. See + * {@link #update(long, BiFunction)}. + * + * <p>Multi-threaded access is guarded by {@link #updateMutex}. + */ + private CompletableFuture<T> updaterFuture = completedFuture(getDefault()); + + /** + * Constructor. + * + * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this + * VersionedValue should be able to listen to, for receiving storage revision updates. This closure is called once on a + * construction of this VersionedValue and accepts a {@code Function<Long, CompletableFuture<?>>} that should be called on every + * update of storage revision as a listener. + * @param maxHistorySize Size of the history of changes to store, including last applied token. + * @param defaultValueSupplier Supplier of the default value, that is used on {@link #update(long, BiFunction)} to evaluate the + * default value if the value is not initialized yet. It is not guaranteed to execute only once. + */ + public IncrementalVersionedValue( + @Nullable Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater, + int maxHistorySize, + @Nullable Supplier<T> defaultValueSupplier + ) { + super(maxHistorySize, defaultValueSupplier); + + if (observableRevisionUpdater != null) { + observableRevisionUpdater.accept(this::completeInternal); + } + } + + /** + * Constructor. + * + * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this + * VersionedValue should be able to listen to, for receiving storage revision updates. This closure is called once on a + * construction of this VersionedValue and accepts a {@code Function<Long, CompletableFuture<?>>} that should be called on every + * update of storage revision as a listener. + * @param defaultValueSupplier Supplier of the default value, that is used on {@link #update(long, BiFunction)} to evaluate the + * default value if the value is not initialized yet. It is not guaranteed to execute only once. + */ + public IncrementalVersionedValue( + @Nullable Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater, + Supplier<T> defaultValueSupplier + ) { + this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE, defaultValueSupplier); + } + + /** + * Constructor with default history size. + * + * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this + * VersionedValue should be able to listen to, for receiving storage revision updates. This closure is called once on a + * construction of this VersionedValue and accepts a {@code Function<Long, CompletableFuture<?>>} that should be called on every + * update of storage revision as a listener. + */ + public IncrementalVersionedValue(Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater) { + this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE, null); + } + + /** + * Updates the value using the given updater. The updater receives the value on previous token, or default value (see constructor) if + * the value isn't initialized, or current intermediate value, if this method has been already called for the same token; and returns a + * new value.<br> The updater will be called after updaters that had been passed to previous calls of this method complete. If an + * exception ({@link CancellationException} or {@link CompletionException}) was thrown when calculating the value for previous token, + * then updater is used to process the exception and calculate a new value.<br> This method can be called multiple times for the same + * token, and doesn't complete the future created for this token. The future is supposed to be completed by storage revision update or a + * call of {@link #complete(long)} in this case. If this method has been called at least once on the given token, the updater will + * receive a value that was evaluated by updater on previous call, as intermediate result.<br> As the order of multiple calls of this + * method on the same token is unknown, operations done by the updater must be commutative. For example: + * <ul> + * <li>this method was called for token N-1 and updater evaluated the value V1;</li> + * <li>a storage revision update happened;</li> + * <li>this method is called for token N, updater receives V1 and evaluates V2;</li> + * <li>this method is called once again for token N, then the updater receives V2 as intermediate result and evaluates V3;</li> + * <li>storage revision update happens and the future for token N completes with value V3.</li> + * </ul> + * Regardless of order in which this method's calls are made, V3 should be the final result. + * <br> + * The method should return a future that will be completed when {@code updater} completes. + * + * @param causalityToken Causality token. Used mainly for sanity checks. + * @param updater The binary function that accepts previous value and exception, if present, and update it to compute the new + * value. + * @return Future for updated value. + */ + public CompletableFuture<T> update(long causalityToken, BiFunction<T, Throwable, CompletableFuture<T>> updater) { + synchronized (updateMutex) { + if (expectedToken == -1) { + assert causalityToken > lastCompleteToken + : String.format("Causality token is outdated, previous token %d, got %d", lastCompleteToken, causalityToken); + + expectedToken = causalityToken; + } else { + assert expectedToken == causalityToken + : String.format("Causality token mismatch, expected %d, got %d", expectedToken, causalityToken); + } + + updaterFuture = updaterFuture + .handle(updater) + .thenCompose(Function.identity()); Review Comment: Yes, `updater` returns a `CompletableFuture` and there's no `handleAndCompose` method or whatever to perform the `CompletableFuture<CompletableFuture<T>> -> CompletableFuture<T>` transformation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
