ibessonov commented on code in PR #1807:
URL: https://github.com/apache/ignite-3/pull/1807#discussion_r1144423680


##########
modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Versioned Value flavor that accumulates updates posted by the {@link 
#update} method and "commits" them when {@link #complete} or
+ * {@link #completeExceptionally} are called or a storage revision is updated 
(see constructors for details).
+ */
+public class IncrementalVersionedValue<T> extends AbstractVersionedValue<T> {
+    /** Update mutex. */
+    private final Object updateMutex = new Object();
+
+    /**
+     * Token that was used with the most recent {@link #update} call.
+     *
+     * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+     */
+    private long expectedToken = -1;
+
+    /**
+     * Token that was used with the most recent {@link #completeInternal} call.
+     *
+     * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+     */
+    private long lastCompleteToken = -1;
+
+    /**
+     * Future that will be completed after all updates over the value in 
context of current causality token will be performed. This
+     * {@code updaterFuture} is {@code null} if no updates in context of 
current causality token have been initiated. See
+     * {@link #update(long, BiFunction)}.
+     *
+     * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+     */
+    private CompletableFuture<T> updaterFuture = completedFuture(getDefault());
+
+    /**
+     * Constructor.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this 
VersionedValue with a revision updater, that this
+     *         VersionedValue should be able to listen to, for receiving 
storage revision updates. This closure is called once on a
+     *         construction of this VersionedValue and accepts a {@code 
Function<Long, CompletableFuture<?>>} that should be called on every
+     *         update of storage revision as a listener.
+     * @param maxHistorySize Size of the history of changes to store, 
including last applied token.
+     * @param defaultValueSupplier Supplier of the default value, that is used 
on {@link #update(long, BiFunction)} to evaluate the
+     *         default value if the value is not initialized yet. It is not 
guaranteed to execute only once.
+     */
+    public IncrementalVersionedValue(
+            @Nullable Consumer<Function<Long, CompletableFuture<?>>> 
observableRevisionUpdater,
+            int maxHistorySize,
+            @Nullable Supplier<T> defaultValueSupplier
+    ) {
+        super(maxHistorySize, defaultValueSupplier);
+
+        if (observableRevisionUpdater != null) {
+            observableRevisionUpdater.accept(this::completeInternal);
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this 
VersionedValue with a revision updater, that this
+     *         VersionedValue should be able to listen to, for receiving 
storage revision updates. This closure is called once on a
+     *         construction of this VersionedValue and accepts a {@code 
Function<Long, CompletableFuture<?>>} that should be called on every
+     *         update of storage revision as a listener.
+     * @param defaultValueSupplier Supplier of the default value, that is used 
on {@link #update(long, BiFunction)} to evaluate the
+     *         default value if the value is not initialized yet. It is not 
guaranteed to execute only once.
+     */
+    public IncrementalVersionedValue(
+            @Nullable Consumer<Function<Long, CompletableFuture<?>>> 
observableRevisionUpdater,
+            Supplier<T> defaultValueSupplier
+    ) {
+        this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE, 
defaultValueSupplier);
+    }
+
+    /**
+     * Constructor with default history size.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this 
VersionedValue with a revision updater, that this
+     *         VersionedValue should be able to listen to, for receiving 
storage revision updates. This closure is called once on a
+     *         construction of this VersionedValue and accepts a {@code 
Function<Long, CompletableFuture<?>>} that should be called on every
+     *         update of storage revision as a listener.
+     */
+    public IncrementalVersionedValue(Consumer<Function<Long, 
CompletableFuture<?>>> observableRevisionUpdater) {
+        this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE, null);
+    }
+
+    /**
+     * Updates the value using the given updater. The updater receives the 
value on previous token, or default value (see constructor) if
+     * the value isn't initialized, or current intermediate value, if this 
method has been already called for the same token; and returns a
+     * new value.<br> The updater will be called after updaters that had been 
passed to previous calls of this method complete. If an
+     * exception ({@link CancellationException} or {@link 
CompletionException}) was thrown when calculating the value for previous token,
+     * then updater is used to process the exception and calculate a new 
value.<br> This method can be called multiple times for the same
+     * token, and doesn't complete the future created for this token. The 
future is supposed to be completed by storage revision update or a
+     * call of {@link #complete(long)} in this case. If this method has been 
called at least once on the given token, the updater will
+     * receive a value that was evaluated by updater on previous call, as 
intermediate result.<br> As the order of multiple calls of this
+     * method on the same token is unknown, operations done by the updater 
must be commutative. For example:
+     * <ul>
+     *     <li>this method was called for token N-1 and updater evaluated the 
value V1;</li>
+     *     <li>a storage revision update happened;</li>
+     *     <li>this method is called for token N, updater receives V1 and 
evaluates V2;</li>
+     *     <li>this method is called once again for token N, then the updater 
receives V2 as intermediate result and evaluates V3;</li>
+     *     <li>storage revision update happens and the future for token N 
completes with value V3.</li>
+     * </ul>
+     * Regardless of order in which this method's calls are made, V3 should be 
the final result.
+     * <br>
+     * The method should return a future that will be completed when {@code 
updater} completes.
+     *
+     * @param causalityToken Causality token. Used mainly for sanity checks.
+     * @param updater The binary function that accepts previous value and 
exception, if present, and update it to compute the new
+     *         value.
+     * @return Future for updated value.
+     */
+    public CompletableFuture<T> update(long causalityToken, BiFunction<T, 
Throwable, CompletableFuture<T>> updater) {
+        synchronized (updateMutex) {
+            if (expectedToken == -1) {
+                assert causalityToken > lastCompleteToken
+                        : String.format("Causality token is outdated, previous 
token %d, got %d", lastCompleteToken, causalityToken);
+
+                expectedToken = causalityToken;
+            } else {
+                assert expectedToken == causalityToken
+                        : String.format("Causality token mismatch, expected 
%d, got %d", expectedToken, causalityToken);
+            }
+
+            updaterFuture = updaterFuture
+                    .handle(updater)
+                    .thenCompose(Function.identity());

Review Comment:
   Oh, my bad, it's compose... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to