ibessonov commented on code in PR #1807:
URL: https://github.com/apache/ignite-3/pull/1807#discussion_r1143230263
##########
.idea/codeStyles/Project.xml:
##########
@@ -5,11 +5,6 @@
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="4" />
<option name="TAB_SIZE" value="2" />
- <option name="USE_TAB_CHARACTER" value="false" />
Review Comment:
Please check that these changes make sense. I suppose IDEA made them without
your interaction
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/AbstractVersionedValue.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.Lazy;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of a value.
+ *
+ * <p>The value can be available through the causality token, which is
represented by a {@code long}.
+ *
+ * @param <T> Type of real value.
+ */
+public class AbstractVersionedValue<T> {
+ private final IgniteLogger log =
Loggers.forClass(AbstractVersionedValue.class);
+
+ /** Token until the value is initialized. */
+ private static final long NOT_INITIALIZED = -1L;
+
+ /** Default history size. */
+ public static final int DEFAULT_MAX_HISTORY_SIZE = 10;
+
+ /** Size of the history of changes to store, including last applied token.
*/
+ private final int maxHistorySize;
+
+ /** List of completion listeners, see {@link #whenComplete}. */
+ private final List<CompletionListener<T>> completionListeners = new
CopyOnWriteArrayList<>();
+
+ /** Versioned value storage. */
+ private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history =
new ConcurrentSkipListMap<>();
+
+ /** Default value that is used when a Versioned Value is completed without
an explicit value. */
+ @Nullable
+ private final Lazy<T> defaultValue;
+
+ /**
+ * Last applied causality token.
+ *
+ * <p>Multi-threaded access is guarded by the {@link #readWriteLock}.
+ */
+ private long actualToken = NOT_INITIALIZED;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ protected AbstractVersionedValue(@Nullable Supplier<T>
defaultValueSupplier) {
+ this(DEFAULT_MAX_HISTORY_SIZE, defaultValueSupplier);
+ }
+
+ protected AbstractVersionedValue(int maxHistorySize, @Nullable Supplier<T>
defaultValueSupplier) {
+ this.maxHistorySize = maxHistorySize;
+ this.defaultValue = defaultValueSupplier == null ? null : new
Lazy<>(defaultValueSupplier);
+ }
+
+ /**
+ * Creates a future for this value and causality token, or returns it if
it already exists.
+ *
+ * <p>The returned future is associated with an update having the given
causality token and completes when this update is finished.
+ *
+ * @param causalityToken Causality token. Let's assume that the update
associated with token N is already applied to this value.
+ * Then, if token N is given as an argument, a completed future
will be returned. If token N - 1 is given, this method returns
+ * the result in the state that is actual for the given token. If
the token is strongly outdated, {@link OutdatedTokenException}
+ * is thrown. If token N + 1 is given, this method will return a
future that will be completed when the update associated with
+ * token N + 1 will have been applied. Tokens that greater than N
by more than 1 should never be passed.
+ * @return The future.
+ * @throws OutdatedTokenException If outdated token is passed as an
argument.
+ */
+ public CompletableFuture<T> get(long causalityToken) {
+ assert causalityToken > NOT_INITIALIZED;
+
+ readWriteLock.readLock().lock();
+
+ try {
+ if (causalityToken > actualToken) {
+ return history.computeIfAbsent(causalityToken, t -> new
CompletableFuture<>());
+ }
+
+ Entry<Long, CompletableFuture<T>> histEntry =
history.floorEntry(causalityToken);
+
+ if (histEntry == null) {
+ throw new OutdatedTokenException(causalityToken, actualToken,
maxHistorySize);
+ }
+
+ return histEntry.getValue();
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the latest value of completed future.
+ */
+ public @Nullable T latest() {
+ for (CompletableFuture<T> fut : history.descendingMap().values()) {
+ if (fut.isDone()) {
+ return fut.join();
+ }
+ }
+
+ return getDefault();
+ }
+
+ /**
+ * Returns the default value.
+ */
+ protected final @Nullable T getDefault() {
+ return defaultValue == null ? null : defaultValue.get();
+ }
+
+ /**
+ * Completes the Versioned Value for the given token. This method will
look for the previous complete token and complete all registered
+ * futures in the {@code (prevToken, causalityToken]} range. If no {@code
complete} methods have been called before, all these futures
+ * will be complete with the configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ *
+ * @param causalityToken Causality token.
+ */
+ public void complete(long causalityToken) {
+ CompletableFuture<T> futureForToken;
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ CompletableFuture<T> previousCompleteFuture =
completePreviousFutures();
+
+ futureForToken = history.compute(causalityToken, (token, future)
-> {
+ if (future == null) {
+ // No registered future for this token exists, we can
reuse the previous complete future.
+ return previousCompleteFuture == null ?
completedFuture(getDefault()) : previousCompleteFuture;
+ } else {
+ if (previousCompleteFuture == null) {
+ future.complete(getDefault());
+ } else {
+ copyState(previousCompleteFuture, future);
+ }
+
+ return future;
+ }
+ });
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, futureForToken);
+ }
+
+ /**
+ * Completes the Versioned Value for the given token with the given {@code
future}. This method will look for the previous complete
+ * token and complete all registered futures in the {@code (prevToken,
causalityToken)} range. If no {@code complete} methods have
+ * been called before, all these futures will be complete with the
configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ */
+ protected void complete(long causalityToken, CompletableFuture<T> future) {
+ assert future.isDone();
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ completePreviousFutures();
+
+ CompletableFuture<T> existingFuture =
history.putIfAbsent(causalityToken, future);
+
+ if (existingFuture != null) {
+ copyState(future, existingFuture);
+ }
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, future);
+ }
+
+ /**
+ * Completes a future related with a specific causality token. It is
called only on storage revision update.
+ */
+ @Nullable
+ private CompletableFuture<T> completePreviousFutures() {
+ NavigableMap<Long, CompletableFuture<T>> headMap =
history.headMap(actualToken);
+
+ if (headMap.isEmpty()) {
+ return null;
+ }
+
+ List<CompletableFuture<T>> futuresToComplete = List.of();
+
+ CompletableFuture<T> previousCompleteFuture = null;
+
+ for (CompletableFuture<T> future : headMap.descendingMap().values()) {
+ if (future.isDone()) {
+ previousCompleteFuture = future;
+
+ break;
+ }
+
+ // Lazy initialization.
+ if (futuresToComplete.isEmpty()) {
+ futuresToComplete = new ArrayList<>();
+ }
+
+ futuresToComplete.add(future);
+ }
+
+ // We found the first complete future, but there are no incomplete
futures up to the actual token, so there's nothing to do.
+ if (futuresToComplete.isEmpty()) {
+ return previousCompleteFuture;
+ }
+
+ if (previousCompleteFuture == null) {
+ // "complete" method has never been called before, use the default
value as the original.
+ T defaultValue = getDefault();
+
+ futuresToComplete.forEach(f -> f.complete(defaultValue));
Review Comment:
I feel like you complete futures in reverse order here, is this alright?
Same for the `else` branch
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/AbstractVersionedValue.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.Lazy;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of a value.
+ *
+ * <p>The value can be available through the causality token, which is
represented by a {@code long}.
+ *
+ * @param <T> Type of real value.
+ */
+public class AbstractVersionedValue<T> {
+ private final IgniteLogger log =
Loggers.forClass(AbstractVersionedValue.class);
+
+ /** Token until the value is initialized. */
+ private static final long NOT_INITIALIZED = -1L;
+
+ /** Default history size. */
+ public static final int DEFAULT_MAX_HISTORY_SIZE = 10;
+
+ /** Size of the history of changes to store, including last applied token.
*/
+ private final int maxHistorySize;
+
+ /** List of completion listeners, see {@link #whenComplete}. */
+ private final List<CompletionListener<T>> completionListeners = new
CopyOnWriteArrayList<>();
+
+ /** Versioned value storage. */
+ private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history =
new ConcurrentSkipListMap<>();
+
+ /** Default value that is used when a Versioned Value is completed without
an explicit value. */
+ @Nullable
+ private final Lazy<T> defaultValue;
+
+ /**
+ * Last applied causality token.
+ *
+ * <p>Multi-threaded access is guarded by the {@link #readWriteLock}.
+ */
+ private long actualToken = NOT_INITIALIZED;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ protected AbstractVersionedValue(@Nullable Supplier<T>
defaultValueSupplier) {
+ this(DEFAULT_MAX_HISTORY_SIZE, defaultValueSupplier);
+ }
+
+ protected AbstractVersionedValue(int maxHistorySize, @Nullable Supplier<T>
defaultValueSupplier) {
+ this.maxHistorySize = maxHistorySize;
+ this.defaultValue = defaultValueSupplier == null ? null : new
Lazy<>(defaultValueSupplier);
+ }
+
+ /**
+ * Creates a future for this value and causality token, or returns it if
it already exists.
+ *
+ * <p>The returned future is associated with an update having the given
causality token and completes when this update is finished.
+ *
+ * @param causalityToken Causality token. Let's assume that the update
associated with token N is already applied to this value.
+ * Then, if token N is given as an argument, a completed future
will be returned. If token N - 1 is given, this method returns
+ * the result in the state that is actual for the given token. If
the token is strongly outdated, {@link OutdatedTokenException}
+ * is thrown. If token N + 1 is given, this method will return a
future that will be completed when the update associated with
+ * token N + 1 will have been applied. Tokens that greater than N
by more than 1 should never be passed.
+ * @return The future.
+ * @throws OutdatedTokenException If outdated token is passed as an
argument.
+ */
+ public CompletableFuture<T> get(long causalityToken) {
+ assert causalityToken > NOT_INITIALIZED;
+
+ readWriteLock.readLock().lock();
+
+ try {
+ if (causalityToken > actualToken) {
+ return history.computeIfAbsent(causalityToken, t -> new
CompletableFuture<>());
+ }
+
+ Entry<Long, CompletableFuture<T>> histEntry =
history.floorEntry(causalityToken);
+
+ if (histEntry == null) {
+ throw new OutdatedTokenException(causalityToken, actualToken,
maxHistorySize);
+ }
+
+ return histEntry.getValue();
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the latest value of completed future.
+ */
+ public @Nullable T latest() {
+ for (CompletableFuture<T> fut : history.descendingMap().values()) {
+ if (fut.isDone()) {
+ return fut.join();
+ }
+ }
+
+ return getDefault();
+ }
+
+ /**
+ * Returns the default value.
+ */
+ protected final @Nullable T getDefault() {
+ return defaultValue == null ? null : defaultValue.get();
+ }
+
+ /**
+ * Completes the Versioned Value for the given token. This method will
look for the previous complete token and complete all registered
+ * futures in the {@code (prevToken, causalityToken]} range. If no {@code
complete} methods have been called before, all these futures
+ * will be complete with the configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ *
+ * @param causalityToken Causality token.
+ */
+ public void complete(long causalityToken) {
+ CompletableFuture<T> futureForToken;
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ CompletableFuture<T> previousCompleteFuture =
completePreviousFutures();
+
+ futureForToken = history.compute(causalityToken, (token, future)
-> {
+ if (future == null) {
+ // No registered future for this token exists, we can
reuse the previous complete future.
+ return previousCompleteFuture == null ?
completedFuture(getDefault()) : previousCompleteFuture;
+ } else {
+ if (previousCompleteFuture == null) {
+ future.complete(getDefault());
+ } else {
+ copyState(previousCompleteFuture, future);
+ }
+
+ return future;
+ }
+ });
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, futureForToken);
+ }
+
+ /**
+ * Completes the Versioned Value for the given token with the given {@code
future}. This method will look for the previous complete
+ * token and complete all registered futures in the {@code (prevToken,
causalityToken)} range. If no {@code complete} methods have
+ * been called before, all these futures will be complete with the
configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ */
+ protected void complete(long causalityToken, CompletableFuture<T> future) {
+ assert future.isDone();
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ completePreviousFutures();
+
+ CompletableFuture<T> existingFuture =
history.putIfAbsent(causalityToken, future);
+
+ if (existingFuture != null) {
+ copyState(future, existingFuture);
+ }
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, future);
+ }
+
+ /**
+ * Completes a future related with a specific causality token. It is
called only on storage revision update.
+ */
+ @Nullable
+ private CompletableFuture<T> completePreviousFutures() {
+ NavigableMap<Long, CompletableFuture<T>> headMap =
history.headMap(actualToken);
+
+ if (headMap.isEmpty()) {
+ return null;
+ }
+
+ List<CompletableFuture<T>> futuresToComplete = List.of();
+
+ CompletableFuture<T> previousCompleteFuture = null;
+
+ for (CompletableFuture<T> future : headMap.descendingMap().values()) {
+ if (future.isDone()) {
+ previousCompleteFuture = future;
+
+ break;
+ }
+
+ // Lazy initialization.
+ if (futuresToComplete.isEmpty()) {
+ futuresToComplete = new ArrayList<>();
+ }
+
+ futuresToComplete.add(future);
+ }
+
+ // We found the first complete future, but there are no incomplete
futures up to the actual token, so there's nothing to do.
+ if (futuresToComplete.isEmpty()) {
+ return previousCompleteFuture;
+ }
+
+ if (previousCompleteFuture == null) {
+ // "complete" method has never been called before, use the default
value as the original.
+ T defaultValue = getDefault();
+
+ futuresToComplete.forEach(f -> f.complete(defaultValue));
+ } else {
+ assert previousCompleteFuture.isDone();
+
+ // Create an effectively final variable.
+ List<CompletableFuture<T>> futuresToCompleteCopy =
futuresToComplete;
+
+ previousCompleteFuture.whenComplete((v, t) -> {
+ if (t != null) {
+ futuresToCompleteCopy.forEach(f ->
f.completeExceptionally(t));
+ } else {
+ futuresToCompleteCopy.forEach(f -> f.complete(v));
+ }
+ });
+ }
+
+ return previousCompleteFuture;
+ }
+
+ /**
+ * Updates the current token. Must be called under the {@link
#readWriteLock writeLock}.
+ */
+ private void setActualToken(long causalityToken) {
+ assert actualToken < causalityToken
+ : format("Token must be greater than actual [token={},
actual={}]", causalityToken, actualToken);
+
+ actualToken = causalityToken;
+ }
+
+ /**
+ * Trims the storage to history size. Must be called under the {@link
#readWriteLock writeLock}.
+ */
+ private void trimHistory() {
+ NavigableMap<Long, CompletableFuture<T>> oldTokensMap =
history.headMap(actualToken, true);
+
+ if (oldTokensMap.size() <= maxHistorySize) {
+ return;
+ }
+
+ Iterator<Map.Entry<Long, CompletableFuture<T>>> it =
oldTokensMap.entrySet().iterator();
+
+ for (int i = oldTokensMap.size(); i > maxHistorySize; i--) {
+ Map.Entry<Long, CompletableFuture<T>> next = it.next();
+
+ // All futures must be explicitly completed before history
trimming occurs.
+ assert next.getValue().isDone();
Review Comment:
How do we guarantee this?
##########
modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java:
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link IncrementalVersionedValue}.
+ */
+public class IncrementalVersionedValueTest {
+ /** Test value. */
+ private static final int TEST_VALUE = 1;
+
+ /** The test revision register is used to move the revision forward. */
+ private final TestRevisionRegister register = new TestRevisionRegister();
+
+ /** Test exception is used for exceptionally completion Versioned value
object. */
+ private static final Exception TEST_EXCEPTION = new Exception("Test
exception");
+
+ /**
+ * Checks that the update method work as expected when the previous value
is known.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdate() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<Integer>(register);
+
+ versionedValue.update(0, (integer, throwable) ->
completedFuture(TEST_VALUE));
+
+ register.moveRevision(0L).join();
+
+ CompletableFuture<Integer> fut = versionedValue.get(1);
+
+ assertFalse(fut.isDone());
+
+ int incrementCount = 10;
+
+ for (int i = 0; i < incrementCount; i++) {
+ versionedValue.update(1, (previous, e) ->
completedFuture(++previous));
+
+ assertFalse(fut.isDone());
+ }
+
+ register.moveRevision(1L).join();
+
+ assertTrue(fut.isDone());
+
+ assertEquals(TEST_VALUE + incrementCount, fut.get());
+
+ assertThrows(AssertionError.class, () -> versionedValue.update(1L, (i,
t) -> completedFuture(null)));
+ }
+
+ /**
+ * Checks that the update method work as expected when there is no history
to calculate previous value.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdatePredefined() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<Integer>(register);
+
+ CompletableFuture<Integer> fut = versionedValue.get(0);
+
+ assertFalse(fut.isDone());
+
+ versionedValue.update(0, (previous, e) -> {
+ assertNull(previous);
+
+ return completedFuture(TEST_VALUE);
+ });
+
+ assertFalse(fut.isDone());
+
+ register.moveRevision(0L).join();
+
+ assertTrue(fut.isDone());
+
+ assertEquals(TEST_VALUE, fut.get());
+ }
+
+ /**
+ * Test asynchronous update closure.
+ */
+ @Test
+ public void testAsyncUpdate() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register);
+
+ CompletableFuture<Integer> fut = new CompletableFuture<>();
+
+ vv.update(0L, (v, e) -> fut);
+
+ CompletableFuture<Integer> vvFut = vv.get(0L);
+
+ CompletableFuture<?> revFut = register.moveRevision(0L);
+
+ assertFalse(fut.isDone());
+ assertFalse(vvFut.isDone());
+ assertFalse(revFut.isDone());
+
+ fut.complete(1);
+
+ revFut.join();
+
+ assertTrue(vvFut.isDone());
+ }
+
+ /**
+ * Test the case when exception happens in updater.
+ */
+ @Test
+ public void testExceptionOnUpdate() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register, () -> 0);
+
+ final int count = 4;
+ final int successfulCompletionsCount = count / 2;
+
+ AtomicInteger actualSuccessfulCompletionsCount = new AtomicInteger();
+
+ final String exceptionMsg = "test msg";
+
+ for (int i = 0; i < count; i++) {
+ vv.update(0L, (v, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ if (v == successfulCompletionsCount) {
+ throw new IgniteInternalException(exceptionMsg);
+ }
+
+ actualSuccessfulCompletionsCount.incrementAndGet();
+
+ return completedFuture(++v);
+ });
+ }
+
+ AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+ vv.whenComplete((t, v, e) -> exceptionRef.set(e));
+
+ vv.complete(0L);
+
+ assertThrowsWithCause(() -> vv.get(0L).join(),
IgniteInternalException.class);
+
+ assertThat(exceptionRef.get().getMessage(),
containsString(exceptionMsg));
+ }
+
+ /**
+ * Test with multiple versioned values and asynchronous completion.
+ */
+ @Test
+ public void testAsyncMultiVv() {
+ final String registryName = "Registry";
+ final String assignmentName = "Assignment";
+ final String tableName = "T1_";
+
+ IncrementalVersionedValue<Map<UUID, String>> tablesVv = new
IncrementalVersionedValue<>(f -> {}, HashMap::new);
+ IncrementalVersionedValue<Map<UUID, String>> schemasVv = new
IncrementalVersionedValue<>(register, HashMap::new);
+ IncrementalVersionedValue<Map<UUID, String>> assignmentsVv = new
IncrementalVersionedValue<>(register, HashMap::new);
+
+ schemasVv.whenComplete((token, value, ex) -> tablesVv.complete(token));
+
+ BiFunction<Long, UUID, CompletableFuture<String>> schemaRegistry =
+ (token, uuid) -> schemasVv.get(token).thenApply(schemas ->
schemas.get(uuid));
+
+ // Adding table.
+ long token = 0L;
+ UUID tableId = UUID.randomUUID();
+
+ CompletableFuture<String> tableFut = schemaRegistry.apply(token,
tableId)
+ .thenCombine(assignmentsVv.get(token), (registry, assignments)
-> tableName + registry + assignments.get(tableId));
+
+ tablesVv.update(token, (old, e) -> tableFut.thenApply(table -> {
+ Map<UUID, String> val = new HashMap<>(old);
+
+ val.put(tableId, table);
+
+ return val;
+ }));
+
+ CompletableFuture<String> userFut = tablesVv.get(token).thenApply(map
-> map.get(tableId));
+
+ schemasVv.update(token, (old, e) -> {
+ old.put(tableId, registryName);
+
+ return completedFuture(old);
+ });
+
+ assignmentsVv.update(token, (old, e) -> {
+ old.put(tableId, assignmentName);
+
+ return completedFuture(old);
+ });
+
+ assertFalse(tableFut.isDone());
+ assertFalse(userFut.isDone());
+
+ register.moveRevision(token).join();
+
+ tableFut.join();
+
+ assertEquals(tableName + registryName + assignmentName,
userFut.join());
+ }
+
+ /**
+ * Tests a default value supplier.
+ */
+ @Test
+ public void testDefaultValueSupplier() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register, () -> TEST_VALUE);
+
+ checkDefaultValue(vv, TEST_VALUE);
+ }
+
+ /**
+ * Tests a case when there is no default value supplier.
+ */
+ @Test
+ public void testWithoutDefaultValue() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register);
+
+ checkDefaultValue(vv, null);
+ }
+
+ @RepeatedTest(100)
+ void testConcurrentGetAndComplete() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<>(register, () ->
1);
+
+ // Set initial value.
+ versionedValue.complete(1);
+
+ var barrier = new CyclicBarrier(2);
+
+ CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ barrier.await(1, TimeUnit.SECONDS);
+ versionedValue.complete(3);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ barrier.await(1, TimeUnit.SECONDS);
+
+ CompletableFuture<Integer> readerFuture = versionedValue.get(2);
+
+ assertThat(writerFuture, willCompleteSuccessfully());
+ assertThat(readerFuture, willBe(1));
+ }
+
+ @RepeatedTest(100)
+ void testConcurrentGetAndCompleteWithHistoryTrimming() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<>(register, () ->
1);
+
+ // Set initial value (history size 1).
+ versionedValue.complete(2);
+ // Set history size to 2.
+ versionedValue.complete(3);
+
+ var barrier = new CyclicBarrier(2);
+
+ CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ barrier.await(1, TimeUnit.SECONDS);
+
+ versionedValue.update(4, (i, t) -> completedFuture(i + 1));
+ // Trigger history trimming
+ versionedValue.complete(4);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ barrier.await(1, TimeUnit.SECONDS);
+
+ try {
+ CompletableFuture<Integer> readerFuture = versionedValue.get(2);
+
+ assertThat(readerFuture, willBe(1));
+ } catch (OutdatedTokenException ignored) {
+ // This is considered as a valid outcome.
+ }
+
+ assertThat(versionedValue.get(4), willBe(2));
+
+ assertThat(writerFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testCompleteMultipleFutures() {
+ var versionedValue = new IncrementalVersionedValue<>(register, () ->
1);
+
+ // Set initial value.
+ versionedValue.complete(1);
+
+ CompletableFuture<Integer> future1 = versionedValue.get(2);
+ CompletableFuture<Integer> future2 = versionedValue.get(3);
+ CompletableFuture<Integer> future3 = versionedValue.get(4);
+
+ versionedValue.update(4, (i, t) -> completedFuture(i + 1));
+
+ versionedValue.complete(4);
+
+ assertThat(future1, willBe(1));
+ assertThat(future2, willBe(1));
+ assertThat(future3, willBe(2));
+ }
+
+ /**
+ * Test {@link IncrementalVersionedValue#whenComplete}.
+ */
+ @Test
+ public void testWhenComplete() {
+ var vv = new IncrementalVersionedValue<>(register, () -> 1);
+
+ CompletionListener<Integer> listener = mock(CompletionListener.class);
+
+ vv.whenComplete(listener);
+
+ // Test complete.
+ long token = 0;
+
+ vv.complete(token);
+
+ verify(listener).whenComplete(token, 1, null);
+
+ // Test update.
+ token = 1;
+
+ vv.update(token, (i, t) -> completedFuture(i + 1));
+ vv.update(token, (i, t) -> completedFuture(i + 1));
+
+ vv.complete(token);
+
+ verify(listener).whenComplete(token, 3, null);
+
+ // Test complete exceptionally.
+ token = 2;
+
+ vv.completeExceptionally(token, TEST_EXCEPTION);
+
+ verify(listener).whenComplete(token, null, TEST_EXCEPTION);
+
+ // Test remove listener.
+ token = 3;
+
+ vv.removeWhenComplete(listener);
+
+ clearInvocations(listener);
+
+ vv.complete(token);
+
+ verify(listener, never()).whenComplete(anyLong(), any(), any());
+ }
+
+ /**
+ * Tests a case when there is no default value supplier.
+ */
+ private void checkDefaultValue(IncrementalVersionedValue<Integer> vv,
@Nullable Integer expectedDefault) {
+ assertEquals(expectedDefault, vv.latest());
+
+ vv.update(0, (a, e) -> {
+ assertEquals(expectedDefault, vv.latest());
+
+ return completedFuture(a == null ? null : a + 1);
+ }
+ );
+
+ assertEquals(expectedDefault, vv.latest());
+
+ CompletableFuture<Integer> f = vv.get(0);
+
+ assertFalse(f.isDone());
+
+ vv.update(0, (a, e) -> completedFuture(a == null ? null : a + 1));
+
+ register.moveRevision(0L).join();
+
+ assertTrue(f.isDone());
+
+ assertEquals(expectedDefault == null ? null : TEST_VALUE + 2,
f.join());
Review Comment:
I believe that you should use `expectedDefault` instead of `TEST_VALUE` in
this assertion
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Versioned Value flavor that accumulates updates posted by the {@link
#update} method and "commits" them when {@link #complete} or
+ * {@link #completeExceptionally} are called or a storage revision is updated
(see constructors for details).
+ */
+public class IncrementalVersionedValue<T> extends AbstractVersionedValue<T> {
+ /** Update mutex. */
+ private final Object updateMutex = new Object();
+
+ /**
+ * Token that was used with the most recent {@link #update} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long expectedToken = -1;
+
+ /**
+ * Token that was used with the most recent {@link #completeInternal} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long lastCompleteToken = -1;
+
+ /**
+ * Future that will be completed after all updates over the value in
context of current causality token will be performed. This
+ * {@code updaterFuture} is {@code null} if no updates in context of
current causality token have been initiated. See
+ * {@link #update(long, BiFunction)}.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private CompletableFuture<T> updaterFuture = completedFuture(getDefault());
+
+ /**
+ * Constructor.
+ *
+ * @param observableRevisionUpdater A closure intended to connect this
VersionedValue with a revision updater, that this
+ * VersionedValue should be able to listen to, for receiving
storage revision updates. This closure is called once on a
+ * construction of this VersionedValue and accepts a {@code
Function<Long, CompletableFuture<?>>} that should be called on every
+ * update of storage revision as a listener.
+ * @param maxHistorySize Size of the history of changes to store,
including last applied token.
+ * @param defaultValueSupplier Supplier of the default value, that is used
on {@link #update(long, BiFunction)} to evaluate the
+ * default value if the value is not initialized yet. It is not
guaranteed to execute only once.
+ */
+ public IncrementalVersionedValue(
+ @Nullable Consumer<Function<Long, CompletableFuture<?>>>
observableRevisionUpdater,
Review Comment:
Are there benefits of using `Function<Long, ...>` instead of
`LongFunction<...>`?
Is this to save some allocations on boxed longs, or just an artifact of poor
choice, made by someone else?
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/AbstractVersionedValue.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.Lazy;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of a value.
+ *
+ * <p>The value can be available through the causality token, which is
represented by a {@code long}.
+ *
+ * @param <T> Type of real value.
+ */
+public class AbstractVersionedValue<T> {
+ private final IgniteLogger log =
Loggers.forClass(AbstractVersionedValue.class);
+
+ /** Token until the value is initialized. */
+ private static final long NOT_INITIALIZED = -1L;
+
+ /** Default history size. */
+ public static final int DEFAULT_MAX_HISTORY_SIZE = 10;
+
+ /** Size of the history of changes to store, including last applied token.
*/
+ private final int maxHistorySize;
+
+ /** List of completion listeners, see {@link #whenComplete}. */
+ private final List<CompletionListener<T>> completionListeners = new
CopyOnWriteArrayList<>();
+
+ /** Versioned value storage. */
+ private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history =
new ConcurrentSkipListMap<>();
+
+ /** Default value that is used when a Versioned Value is completed without
an explicit value. */
+ @Nullable
+ private final Lazy<T> defaultValue;
+
+ /**
+ * Last applied causality token.
+ *
+ * <p>Multi-threaded access is guarded by the {@link #readWriteLock}.
+ */
+ private long actualToken = NOT_INITIALIZED;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ protected AbstractVersionedValue(@Nullable Supplier<T>
defaultValueSupplier) {
+ this(DEFAULT_MAX_HISTORY_SIZE, defaultValueSupplier);
+ }
+
+ protected AbstractVersionedValue(int maxHistorySize, @Nullable Supplier<T>
defaultValueSupplier) {
+ this.maxHistorySize = maxHistorySize;
+ this.defaultValue = defaultValueSupplier == null ? null : new
Lazy<>(defaultValueSupplier);
+ }
+
+ /**
+ * Creates a future for this value and causality token, or returns it if
it already exists.
+ *
+ * <p>The returned future is associated with an update having the given
causality token and completes when this update is finished.
+ *
+ * @param causalityToken Causality token. Let's assume that the update
associated with token N is already applied to this value.
+ * Then, if token N is given as an argument, a completed future
will be returned. If token N - 1 is given, this method returns
+ * the result in the state that is actual for the given token. If
the token is strongly outdated, {@link OutdatedTokenException}
+ * is thrown. If token N + 1 is given, this method will return a
future that will be completed when the update associated with
+ * token N + 1 will have been applied. Tokens that greater than N
by more than 1 should never be passed.
+ * @return The future.
+ * @throws OutdatedTokenException If outdated token is passed as an
argument.
+ */
+ public CompletableFuture<T> get(long causalityToken) {
+ assert causalityToken > NOT_INITIALIZED;
+
+ readWriteLock.readLock().lock();
+
+ try {
+ if (causalityToken > actualToken) {
+ return history.computeIfAbsent(causalityToken, t -> new
CompletableFuture<>());
+ }
+
+ Entry<Long, CompletableFuture<T>> histEntry =
history.floorEntry(causalityToken);
+
+ if (histEntry == null) {
+ throw new OutdatedTokenException(causalityToken, actualToken,
maxHistorySize);
+ }
+
+ return histEntry.getValue();
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the latest value of completed future.
+ */
+ public @Nullable T latest() {
+ for (CompletableFuture<T> fut : history.descendingMap().values()) {
+ if (fut.isDone()) {
+ return fut.join();
+ }
+ }
+
+ return getDefault();
+ }
+
+ /**
+ * Returns the default value.
+ */
+ protected final @Nullable T getDefault() {
+ return defaultValue == null ? null : defaultValue.get();
+ }
+
+ /**
+ * Completes the Versioned Value for the given token. This method will
look for the previous complete token and complete all registered
+ * futures in the {@code (prevToken, causalityToken]} range. If no {@code
complete} methods have been called before, all these futures
+ * will be complete with the configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ *
+ * @param causalityToken Causality token.
+ */
+ public void complete(long causalityToken) {
+ CompletableFuture<T> futureForToken;
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ CompletableFuture<T> previousCompleteFuture =
completePreviousFutures();
+
+ futureForToken = history.compute(causalityToken, (token, future)
-> {
+ if (future == null) {
+ // No registered future for this token exists, we can
reuse the previous complete future.
+ return previousCompleteFuture == null ?
completedFuture(getDefault()) : previousCompleteFuture;
+ } else {
+ if (previousCompleteFuture == null) {
+ future.complete(getDefault());
+ } else {
+ copyState(previousCompleteFuture, future);
+ }
+
+ return future;
+ }
+ });
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, futureForToken);
+ }
+
+ /**
+ * Completes the Versioned Value for the given token with the given {@code
future}. This method will look for the previous complete
+ * token and complete all registered futures in the {@code (prevToken,
causalityToken)} range. If no {@code complete} methods have
+ * been called before, all these futures will be complete with the
configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ */
+ protected void complete(long causalityToken, CompletableFuture<T> future) {
+ assert future.isDone();
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ completePreviousFutures();
+
+ CompletableFuture<T> existingFuture =
history.putIfAbsent(causalityToken, future);
+
+ if (existingFuture != null) {
+ copyState(future, existingFuture);
+ }
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, future);
+ }
+
+ /**
+ * Completes a future related with a specific causality token. It is
called only on storage revision update.
+ */
+ @Nullable
+ private CompletableFuture<T> completePreviousFutures() {
+ NavigableMap<Long, CompletableFuture<T>> headMap =
history.headMap(actualToken);
+
+ if (headMap.isEmpty()) {
+ return null;
+ }
+
+ List<CompletableFuture<T>> futuresToComplete = List.of();
+
+ CompletableFuture<T> previousCompleteFuture = null;
+
+ for (CompletableFuture<T> future : headMap.descendingMap().values()) {
+ if (future.isDone()) {
+ previousCompleteFuture = future;
+
+ break;
+ }
+
+ // Lazy initialization.
+ if (futuresToComplete.isEmpty()) {
+ futuresToComplete = new ArrayList<>();
+ }
+
+ futuresToComplete.add(future);
+ }
+
+ // We found the first complete future, but there are no incomplete
futures up to the actual token, so there's nothing to do.
+ if (futuresToComplete.isEmpty()) {
Review Comment:
Is there a real case when this list is not empty? Seems counter-intuitive,
that the value is not updated, but the future is not completed at the same time.
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/AbstractVersionedValue.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.Lazy;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of a value.
+ *
+ * <p>The value can be available through the causality token, which is
represented by a {@code long}.
+ *
+ * @param <T> Type of real value.
+ */
+public class AbstractVersionedValue<T> {
+ private final IgniteLogger log =
Loggers.forClass(AbstractVersionedValue.class);
Review Comment:
Please make it static
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/AbstractVersionedValue.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.Lazy;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of a value.
+ *
+ * <p>The value can be available through the causality token, which is
represented by a {@code long}.
+ *
+ * @param <T> Type of real value.
+ */
+public class AbstractVersionedValue<T> {
+ private final IgniteLogger log =
Loggers.forClass(AbstractVersionedValue.class);
+
+ /** Token until the value is initialized. */
+ private static final long NOT_INITIALIZED = -1L;
+
+ /** Default history size. */
+ public static final int DEFAULT_MAX_HISTORY_SIZE = 10;
+
+ /** Size of the history of changes to store, including last applied token.
*/
+ private final int maxHistorySize;
+
+ /** List of completion listeners, see {@link #whenComplete}. */
+ private final List<CompletionListener<T>> completionListeners = new
CopyOnWriteArrayList<>();
+
+ /** Versioned value storage. */
+ private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history =
new ConcurrentSkipListMap<>();
+
+ /** Default value that is used when a Versioned Value is completed without
an explicit value. */
+ @Nullable
+ private final Lazy<T> defaultValue;
+
+ /**
+ * Last applied causality token.
+ *
+ * <p>Multi-threaded access is guarded by the {@link #readWriteLock}.
+ */
+ private long actualToken = NOT_INITIALIZED;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ protected AbstractVersionedValue(@Nullable Supplier<T>
defaultValueSupplier) {
+ this(DEFAULT_MAX_HISTORY_SIZE, defaultValueSupplier);
+ }
+
+ protected AbstractVersionedValue(int maxHistorySize, @Nullable Supplier<T>
defaultValueSupplier) {
+ this.maxHistorySize = maxHistorySize;
+ this.defaultValue = defaultValueSupplier == null ? null : new
Lazy<>(defaultValueSupplier);
+ }
+
+ /**
+ * Creates a future for this value and causality token, or returns it if
it already exists.
+ *
+ * <p>The returned future is associated with an update having the given
causality token and completes when this update is finished.
+ *
+ * @param causalityToken Causality token. Let's assume that the update
associated with token N is already applied to this value.
+ * Then, if token N is given as an argument, a completed future
will be returned. If token N - 1 is given, this method returns
+ * the result in the state that is actual for the given token. If
the token is strongly outdated, {@link OutdatedTokenException}
+ * is thrown. If token N + 1 is given, this method will return a
future that will be completed when the update associated with
+ * token N + 1 will have been applied. Tokens that greater than N
by more than 1 should never be passed.
+ * @return The future.
+ * @throws OutdatedTokenException If outdated token is passed as an
argument.
+ */
+ public CompletableFuture<T> get(long causalityToken) {
+ assert causalityToken > NOT_INITIALIZED;
+
+ readWriteLock.readLock().lock();
+
+ try {
+ if (causalityToken > actualToken) {
+ return history.computeIfAbsent(causalityToken, t -> new
CompletableFuture<>());
+ }
+
+ Entry<Long, CompletableFuture<T>> histEntry =
history.floorEntry(causalityToken);
+
+ if (histEntry == null) {
+ throw new OutdatedTokenException(causalityToken, actualToken,
maxHistorySize);
+ }
+
+ return histEntry.getValue();
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the latest value of completed future.
+ */
+ public @Nullable T latest() {
+ for (CompletableFuture<T> fut : history.descendingMap().values()) {
+ if (fut.isDone()) {
+ return fut.join();
+ }
+ }
+
+ return getDefault();
+ }
+
+ /**
+ * Returns the default value.
+ */
+ protected final @Nullable T getDefault() {
+ return defaultValue == null ? null : defaultValue.get();
+ }
+
+ /**
+ * Completes the Versioned Value for the given token. This method will
look for the previous complete token and complete all registered
+ * futures in the {@code (prevToken, causalityToken]} range. If no {@code
complete} methods have been called before, all these futures
+ * will be complete with the configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ *
+ * @param causalityToken Causality token.
+ */
+ public void complete(long causalityToken) {
+ CompletableFuture<T> futureForToken;
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ CompletableFuture<T> previousCompleteFuture =
completePreviousFutures();
+
+ futureForToken = history.compute(causalityToken, (token, future)
-> {
+ if (future == null) {
+ // No registered future for this token exists, we can
reuse the previous complete future.
+ return previousCompleteFuture == null ?
completedFuture(getDefault()) : previousCompleteFuture;
+ } else {
+ if (previousCompleteFuture == null) {
+ future.complete(getDefault());
+ } else {
+ copyState(previousCompleteFuture, future);
+ }
+
+ return future;
+ }
+ });
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, futureForToken);
+ }
+
+ /**
+ * Completes the Versioned Value for the given token with the given {@code
future}. This method will look for the previous complete
+ * token and complete all registered futures in the {@code (prevToken,
causalityToken)} range. If no {@code complete} methods have
+ * been called before, all these futures will be complete with the
configured default value.
+ *
+ * <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
+ */
+ protected void complete(long causalityToken, CompletableFuture<T> future) {
+ assert future.isDone();
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ setActualToken(causalityToken);
+
+ completePreviousFutures();
+
+ CompletableFuture<T> existingFuture =
history.putIfAbsent(causalityToken, future);
+
+ if (existingFuture != null) {
+ copyState(future, existingFuture);
+ }
+
+ // Delete older tokens if their amount exceeds the configured
history size.
+ trimHistory();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ notifyCompletionListeners(causalityToken, future);
+ }
+
+ /**
+ * Completes a future related with a specific causality token. It is
called only on storage revision update.
+ */
+ @Nullable
+ private CompletableFuture<T> completePreviousFutures() {
+ NavigableMap<Long, CompletableFuture<T>> headMap =
history.headMap(actualToken);
+
+ if (headMap.isEmpty()) {
+ return null;
+ }
+
+ List<CompletableFuture<T>> futuresToComplete = List.of();
+
+ CompletableFuture<T> previousCompleteFuture = null;
+
+ for (CompletableFuture<T> future : headMap.descendingMap().values()) {
+ if (future.isDone()) {
+ previousCompleteFuture = future;
+
+ break;
+ }
+
+ // Lazy initialization.
+ if (futuresToComplete.isEmpty()) {
+ futuresToComplete = new ArrayList<>();
+ }
+
+ futuresToComplete.add(future);
+ }
+
+ // We found the first complete future, but there are no incomplete
futures up to the actual token, so there's nothing to do.
+ if (futuresToComplete.isEmpty()) {
+ return previousCompleteFuture;
+ }
+
+ if (previousCompleteFuture == null) {
+ // "complete" method has never been called before, use the default
value as the original.
+ T defaultValue = getDefault();
+
+ futuresToComplete.forEach(f -> f.complete(defaultValue));
+ } else {
+ assert previousCompleteFuture.isDone();
+
+ // Create an effectively final variable.
+ List<CompletableFuture<T>> futuresToCompleteCopy =
futuresToComplete;
+
+ previousCompleteFuture.whenComplete((v, t) -> {
+ if (t != null) {
+ futuresToCompleteCopy.forEach(f ->
f.completeExceptionally(t));
+ } else {
+ futuresToCompleteCopy.forEach(f -> f.complete(v));
+ }
+ });
+ }
+
+ return previousCompleteFuture;
+ }
+
+ /**
+ * Updates the current token. Must be called under the {@link
#readWriteLock writeLock}.
+ */
+ private void setActualToken(long causalityToken) {
+ assert actualToken < causalityToken
+ : format("Token must be greater than actual [token={},
actual={}]", causalityToken, actualToken);
+
+ actualToken = causalityToken;
+ }
+
+ /**
+ * Trims the storage to history size. Must be called under the {@link
#readWriteLock writeLock}.
+ */
+ private void trimHistory() {
+ NavigableMap<Long, CompletableFuture<T>> oldTokensMap =
history.headMap(actualToken, true);
+
+ if (oldTokensMap.size() <= maxHistorySize) {
+ return;
+ }
+
+ Iterator<Map.Entry<Long, CompletableFuture<T>>> it =
oldTokensMap.entrySet().iterator();
+
+ for (int i = oldTokensMap.size(); i > maxHistorySize; i--) {
+ Map.Entry<Long, CompletableFuture<T>> next = it.next();
+
+ // All futures must be explicitly completed before history
trimming occurs.
+ assert next.getValue().isDone();
+
+ it.remove();
+ }
+ }
+
+ /**
+ * Copies the state of the {@code from} future to the incomplete {@code
to} future.
+ */
+ private void copyState(CompletableFuture<T> from, CompletableFuture<T> to)
{
Review Comment:
I'd recommend moving this method to some common utils class, we have the
same piece of code multiple times in the project already, just without
assertions.
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Versioned Value flavor that accumulates updates posted by the {@link
#update} method and "commits" them when {@link #complete} or
+ * {@link #completeExceptionally} are called or a storage revision is updated
(see constructors for details).
+ */
+public class IncrementalVersionedValue<T> extends AbstractVersionedValue<T> {
+ /** Update mutex. */
+ private final Object updateMutex = new Object();
+
+ /**
+ * Token that was used with the most recent {@link #update} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long expectedToken = -1;
+
+ /**
+ * Token that was used with the most recent {@link #completeInternal} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long lastCompleteToken = -1;
+
+ /**
+ * Future that will be completed after all updates over the value in
context of current causality token will be performed. This
+ * {@code updaterFuture} is {@code null} if no updates in context of
current causality token have been initiated. See
+ * {@link #update(long, BiFunction)}.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private CompletableFuture<T> updaterFuture = completedFuture(getDefault());
+
+ /**
+ * Constructor.
+ *
+ * @param observableRevisionUpdater A closure intended to connect this
VersionedValue with a revision updater, that this
+ * VersionedValue should be able to listen to, for receiving
storage revision updates. This closure is called once on a
+ * construction of this VersionedValue and accepts a {@code
Function<Long, CompletableFuture<?>>} that should be called on every
+ * update of storage revision as a listener.
+ * @param maxHistorySize Size of the history of changes to store,
including last applied token.
+ * @param defaultValueSupplier Supplier of the default value, that is used
on {@link #update(long, BiFunction)} to evaluate the
+ * default value if the value is not initialized yet. It is not
guaranteed to execute only once.
+ */
+ public IncrementalVersionedValue(
+ @Nullable Consumer<Function<Long, CompletableFuture<?>>>
observableRevisionUpdater,
+ int maxHistorySize,
+ @Nullable Supplier<T> defaultValueSupplier
+ ) {
+ super(maxHistorySize, defaultValueSupplier);
+
+ if (observableRevisionUpdater != null) {
+ observableRevisionUpdater.accept(this::completeInternal);
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param observableRevisionUpdater A closure intended to connect this
VersionedValue with a revision updater, that this
+ * VersionedValue should be able to listen to, for receiving
storage revision updates. This closure is called once on a
+ * construction of this VersionedValue and accepts a {@code
Function<Long, CompletableFuture<?>>} that should be called on every
+ * update of storage revision as a listener.
+ * @param defaultValueSupplier Supplier of the default value, that is used
on {@link #update(long, BiFunction)} to evaluate the
+ * default value if the value is not initialized yet. It is not
guaranteed to execute only once.
+ */
+ public IncrementalVersionedValue(
+ @Nullable Consumer<Function<Long, CompletableFuture<?>>>
observableRevisionUpdater,
+ Supplier<T> defaultValueSupplier
+ ) {
+ this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE,
defaultValueSupplier);
+ }
+
+ /**
+ * Constructor with default history size.
+ *
+ * @param observableRevisionUpdater A closure intended to connect this
VersionedValue with a revision updater, that this
+ * VersionedValue should be able to listen to, for receiving
storage revision updates. This closure is called once on a
+ * construction of this VersionedValue and accepts a {@code
Function<Long, CompletableFuture<?>>} that should be called on every
+ * update of storage revision as a listener.
+ */
+ public IncrementalVersionedValue(Consumer<Function<Long,
CompletableFuture<?>>> observableRevisionUpdater) {
+ this(observableRevisionUpdater, DEFAULT_MAX_HISTORY_SIZE, null);
+ }
+
+ /**
+ * Updates the value using the given updater. The updater receives the
value on previous token, or default value (see constructor) if
+ * the value isn't initialized, or current intermediate value, if this
method has been already called for the same token; and returns a
+ * new value.<br> The updater will be called after updaters that had been
passed to previous calls of this method complete. If an
+ * exception ({@link CancellationException} or {@link
CompletionException}) was thrown when calculating the value for previous token,
+ * then updater is used to process the exception and calculate a new
value.<br> This method can be called multiple times for the same
+ * token, and doesn't complete the future created for this token. The
future is supposed to be completed by storage revision update or a
+ * call of {@link #complete(long)} in this case. If this method has been
called at least once on the given token, the updater will
+ * receive a value that was evaluated by updater on previous call, as
intermediate result.<br> As the order of multiple calls of this
+ * method on the same token is unknown, operations done by the updater
must be commutative. For example:
+ * <ul>
+ * <li>this method was called for token N-1 and updater evaluated the
value V1;</li>
+ * <li>a storage revision update happened;</li>
+ * <li>this method is called for token N, updater receives V1 and
evaluates V2;</li>
+ * <li>this method is called once again for token N, then the updater
receives V2 as intermediate result and evaluates V3;</li>
+ * <li>storage revision update happens and the future for token N
completes with value V3.</li>
+ * </ul>
+ * Regardless of order in which this method's calls are made, V3 should be
the final result.
+ * <br>
+ * The method should return a future that will be completed when {@code
updater} completes.
+ *
+ * @param causalityToken Causality token. Used mainly for sanity checks.
+ * @param updater The binary function that accepts previous value and
exception, if present, and update it to compute the new
+ * value.
+ * @return Future for updated value.
+ */
+ public CompletableFuture<T> update(long causalityToken, BiFunction<T,
Throwable, CompletableFuture<T>> updater) {
+ synchronized (updateMutex) {
+ if (expectedToken == -1) {
+ assert causalityToken > lastCompleteToken
+ : String.format("Causality token is outdated, previous
token %d, got %d", lastCompleteToken, causalityToken);
+
+ expectedToken = causalityToken;
+ } else {
+ assert expectedToken == causalityToken
+ : String.format("Causality token mismatch, expected
%d, got %d", expectedToken, causalityToken);
+ }
+
+ updaterFuture = updaterFuture
+ .handle(updater)
+ .thenCompose(Function.identity());
Review Comment:
Is this necessary?
##########
modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java:
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link IncrementalVersionedValue}.
+ */
+public class IncrementalVersionedValueTest {
+ /** Test value. */
+ private static final int TEST_VALUE = 1;
+
+ /** The test revision register is used to move the revision forward. */
+ private final TestRevisionRegister register = new TestRevisionRegister();
+
+ /** Test exception is used for exceptionally completion Versioned value
object. */
+ private static final Exception TEST_EXCEPTION = new Exception("Test
exception");
+
+ /**
+ * Checks that the update method work as expected when the previous value
is known.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdate() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<Integer>(register);
+
+ versionedValue.update(0, (integer, throwable) ->
completedFuture(TEST_VALUE));
+
+ register.moveRevision(0L).join();
+
+ CompletableFuture<Integer> fut = versionedValue.get(1);
+
+ assertFalse(fut.isDone());
+
+ int incrementCount = 10;
+
+ for (int i = 0; i < incrementCount; i++) {
+ versionedValue.update(1, (previous, e) ->
completedFuture(++previous));
+
+ assertFalse(fut.isDone());
+ }
+
+ register.moveRevision(1L).join();
+
+ assertTrue(fut.isDone());
+
+ assertEquals(TEST_VALUE + incrementCount, fut.get());
+
+ assertThrows(AssertionError.class, () -> versionedValue.update(1L, (i,
t) -> completedFuture(null)));
+ }
+
+ /**
+ * Checks that the update method work as expected when there is no history
to calculate previous value.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdatePredefined() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<Integer>(register);
+
+ CompletableFuture<Integer> fut = versionedValue.get(0);
+
+ assertFalse(fut.isDone());
+
+ versionedValue.update(0, (previous, e) -> {
+ assertNull(previous);
+
+ return completedFuture(TEST_VALUE);
+ });
+
+ assertFalse(fut.isDone());
+
+ register.moveRevision(0L).join();
+
+ assertTrue(fut.isDone());
+
+ assertEquals(TEST_VALUE, fut.get());
+ }
+
+ /**
+ * Test asynchronous update closure.
+ */
+ @Test
+ public void testAsyncUpdate() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register);
+
+ CompletableFuture<Integer> fut = new CompletableFuture<>();
+
+ vv.update(0L, (v, e) -> fut);
+
+ CompletableFuture<Integer> vvFut = vv.get(0L);
+
+ CompletableFuture<?> revFut = register.moveRevision(0L);
+
+ assertFalse(fut.isDone());
+ assertFalse(vvFut.isDone());
+ assertFalse(revFut.isDone());
+
+ fut.complete(1);
+
+ revFut.join();
+
+ assertTrue(vvFut.isDone());
+ }
+
+ /**
+ * Test the case when exception happens in updater.
+ */
+ @Test
+ public void testExceptionOnUpdate() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register, () -> 0);
+
+ final int count = 4;
+ final int successfulCompletionsCount = count / 2;
+
+ AtomicInteger actualSuccessfulCompletionsCount = new AtomicInteger();
+
+ final String exceptionMsg = "test msg";
+
+ for (int i = 0; i < count; i++) {
+ vv.update(0L, (v, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ if (v == successfulCompletionsCount) {
+ throw new IgniteInternalException(exceptionMsg);
+ }
+
+ actualSuccessfulCompletionsCount.incrementAndGet();
+
+ return completedFuture(++v);
+ });
+ }
+
+ AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+ vv.whenComplete((t, v, e) -> exceptionRef.set(e));
+
+ vv.complete(0L);
+
+ assertThrowsWithCause(() -> vv.get(0L).join(),
IgniteInternalException.class);
+
+ assertThat(exceptionRef.get().getMessage(),
containsString(exceptionMsg));
+ }
+
+ /**
+ * Test with multiple versioned values and asynchronous completion.
+ */
+ @Test
+ public void testAsyncMultiVv() {
+ final String registryName = "Registry";
+ final String assignmentName = "Assignment";
+ final String tableName = "T1_";
+
+ IncrementalVersionedValue<Map<UUID, String>> tablesVv = new
IncrementalVersionedValue<>(f -> {}, HashMap::new);
+ IncrementalVersionedValue<Map<UUID, String>> schemasVv = new
IncrementalVersionedValue<>(register, HashMap::new);
+ IncrementalVersionedValue<Map<UUID, String>> assignmentsVv = new
IncrementalVersionedValue<>(register, HashMap::new);
+
+ schemasVv.whenComplete((token, value, ex) -> tablesVv.complete(token));
+
+ BiFunction<Long, UUID, CompletableFuture<String>> schemaRegistry =
+ (token, uuid) -> schemasVv.get(token).thenApply(schemas ->
schemas.get(uuid));
+
+ // Adding table.
+ long token = 0L;
+ UUID tableId = UUID.randomUUID();
+
+ CompletableFuture<String> tableFut = schemaRegistry.apply(token,
tableId)
+ .thenCombine(assignmentsVv.get(token), (registry, assignments)
-> tableName + registry + assignments.get(tableId));
+
+ tablesVv.update(token, (old, e) -> tableFut.thenApply(table -> {
+ Map<UUID, String> val = new HashMap<>(old);
+
+ val.put(tableId, table);
+
+ return val;
+ }));
+
+ CompletableFuture<String> userFut = tablesVv.get(token).thenApply(map
-> map.get(tableId));
+
+ schemasVv.update(token, (old, e) -> {
+ old.put(tableId, registryName);
+
+ return completedFuture(old);
+ });
+
+ assignmentsVv.update(token, (old, e) -> {
+ old.put(tableId, assignmentName);
+
+ return completedFuture(old);
+ });
+
+ assertFalse(tableFut.isDone());
+ assertFalse(userFut.isDone());
+
+ register.moveRevision(token).join();
+
+ tableFut.join();
+
+ assertEquals(tableName + registryName + assignmentName,
userFut.join());
+ }
+
+ /**
+ * Tests a default value supplier.
+ */
+ @Test
+ public void testDefaultValueSupplier() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register, () -> TEST_VALUE);
+
+ checkDefaultValue(vv, TEST_VALUE);
+ }
+
+ /**
+ * Tests a case when there is no default value supplier.
+ */
+ @Test
+ public void testWithoutDefaultValue() {
+ IncrementalVersionedValue<Integer> vv = new
IncrementalVersionedValue<>(register);
+
+ checkDefaultValue(vv, null);
+ }
+
+ @RepeatedTest(100)
+ void testConcurrentGetAndComplete() throws Exception {
+ var versionedValue = new IncrementalVersionedValue<>(register, () ->
1);
+
+ // Set initial value.
+ versionedValue.complete(1);
+
+ var barrier = new CyclicBarrier(2);
Review Comment:
Maybe `IgniteTestUtils#runRace()` would make this code simpler
##########
modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java:
##########
@@ -260,189 +198,47 @@ public void testAutocompleteFuture() throws
OutdatedTokenException {
}
/**
- * Checks that the update method work as expected when the previous value
is known.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testUpdate() throws Exception {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
-
- longVersionedValue.update(0, (integer, throwable) ->
CompletableFuture.completedFuture(TEST_VALUE));
-
- REGISTER.moveRevision(0L).join();
-
- CompletableFuture<Integer> fut = longVersionedValue.get(1);
-
- assertFalse(fut.isDone());
-
- int incrementCount = 10;
-
- for (int i = 0; i < incrementCount; i++) {
- longVersionedValue.update(1, (previous, e) ->
completedFuture(++previous));
-
- assertFalse(fut.isDone());
- }
-
- REGISTER.moveRevision(1L).join();
-
- assertTrue(fut.isDone());
-
- assertEquals(TEST_VALUE + incrementCount, fut.get());
-
- assertThrows(AssertionError.class, () -> longVersionedValue.update(1L,
(i, t) -> completedFuture(null)));
- }
-
- /**
- * Checks that the update method work as expected when there is no history
to calculate previous value.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testUpdatePredefined() throws Exception {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
-
- CompletableFuture<Integer> fut = longVersionedValue.get(0);
-
- assertFalse(fut.isDone());
-
- longVersionedValue.update(0, (previous, e) -> {
- assertNull(previous);
-
- return completedFuture(TEST_VALUE);
- });
-
- assertFalse(fut.isDone());
-
- REGISTER.moveRevision(0L).join();
-
- assertTrue(fut.isDone());
-
- assertEquals(TEST_VALUE, fut.get());
- }
-
- /**
- * Test asynchronous update closure.
- */
- @Test
- public void testAsyncUpdate() {
- VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
-
- CompletableFuture<Integer> fut = new CompletableFuture<>();
-
- vv.update(0L, (v, e) -> fut);
-
- CompletableFuture<Integer> vvFut = vv.get(0L);
-
- CompletableFuture<?> revFut = REGISTER.moveRevision(0L);
-
- assertFalse(fut.isDone());
- assertFalse(vvFut.isDone());
- assertFalse(revFut.isDone());
-
- fut.complete(1);
-
- revFut.join();
-
- assertTrue(vvFut.isDone());
- }
-
- /**
- * Test the case when exception happens in updater.
- */
- @Test
- public void testExceptionOnUpdate() {
- VersionedValue<Integer> vv = new VersionedValue<>(REGISTER, () -> 0);
-
- final int count = 4;
- final int successfulCompletionsCount = count / 2;
-
- AtomicInteger actualSuccessfulCompletionsCount = new AtomicInteger();
-
- final String exceptionMsg = "test msg";
-
- for (int i = 0; i < count; i++) {
- vv.update(0L, (v, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- if (v == successfulCompletionsCount) {
- throw new IgniteInternalException(exceptionMsg);
- }
-
- actualSuccessfulCompletionsCount.incrementAndGet();
-
- return completedFuture(++v);
- });
- }
-
- AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-
- vv.whenComplete((t, v, e) -> exceptionRef.set(e));
-
- vv.complete(0L);
-
- assertThrowsWithCause(() -> vv.get(0L).join(),
IgniteInternalException.class);
-
- assertThat(exceptionRef.get().getMessage(),
containsString(exceptionMsg));
- }
-
- /**
- * Test with multiple versioned values and asynchronous completion.
+ * Test {@link VersionedValue#whenComplete}.
*/
@Test
- public void testAsyncMultiVv() {
- final String registryName = "Registry";
- final String assignmentName = "Assignment";
- final String tableName = "T1_";
-
- VersionedValue<Map<UUID, String>> tablesVv = new VersionedValue<>(f ->
{}, HashMap::new);
- VersionedValue<Map<UUID, String>> schemasVv = new
VersionedValue<>(REGISTER, HashMap::new);
- VersionedValue<Map<UUID, String>> assignmentsVv = new
VersionedValue<>(REGISTER, HashMap::new);
+ public void testWhenComplete() {
Review Comment:
This one seems like a duplicate of the test for incremental versioned value.
Is it?
##########
modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java:
##########
@@ -260,189 +198,47 @@ public void testAutocompleteFuture() throws
OutdatedTokenException {
}
/**
- * Checks that the update method work as expected when the previous value
is known.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testUpdate() throws Exception {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
-
- longVersionedValue.update(0, (integer, throwable) ->
CompletableFuture.completedFuture(TEST_VALUE));
-
- REGISTER.moveRevision(0L).join();
-
- CompletableFuture<Integer> fut = longVersionedValue.get(1);
-
- assertFalse(fut.isDone());
-
- int incrementCount = 10;
-
- for (int i = 0; i < incrementCount; i++) {
- longVersionedValue.update(1, (previous, e) ->
completedFuture(++previous));
-
- assertFalse(fut.isDone());
- }
-
- REGISTER.moveRevision(1L).join();
-
- assertTrue(fut.isDone());
-
- assertEquals(TEST_VALUE + incrementCount, fut.get());
-
- assertThrows(AssertionError.class, () -> longVersionedValue.update(1L,
(i, t) -> completedFuture(null)));
- }
-
- /**
- * Checks that the update method work as expected when there is no history
to calculate previous value.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testUpdatePredefined() throws Exception {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
-
- CompletableFuture<Integer> fut = longVersionedValue.get(0);
-
- assertFalse(fut.isDone());
-
- longVersionedValue.update(0, (previous, e) -> {
- assertNull(previous);
-
- return completedFuture(TEST_VALUE);
- });
-
- assertFalse(fut.isDone());
-
- REGISTER.moveRevision(0L).join();
-
- assertTrue(fut.isDone());
-
- assertEquals(TEST_VALUE, fut.get());
- }
-
- /**
- * Test asynchronous update closure.
- */
- @Test
- public void testAsyncUpdate() {
- VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
-
- CompletableFuture<Integer> fut = new CompletableFuture<>();
-
- vv.update(0L, (v, e) -> fut);
-
- CompletableFuture<Integer> vvFut = vv.get(0L);
-
- CompletableFuture<?> revFut = REGISTER.moveRevision(0L);
-
- assertFalse(fut.isDone());
- assertFalse(vvFut.isDone());
- assertFalse(revFut.isDone());
-
- fut.complete(1);
-
- revFut.join();
-
- assertTrue(vvFut.isDone());
- }
-
- /**
- * Test the case when exception happens in updater.
- */
- @Test
- public void testExceptionOnUpdate() {
- VersionedValue<Integer> vv = new VersionedValue<>(REGISTER, () -> 0);
-
- final int count = 4;
- final int successfulCompletionsCount = count / 2;
-
- AtomicInteger actualSuccessfulCompletionsCount = new AtomicInteger();
-
- final String exceptionMsg = "test msg";
-
- for (int i = 0; i < count; i++) {
- vv.update(0L, (v, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- if (v == successfulCompletionsCount) {
- throw new IgniteInternalException(exceptionMsg);
- }
-
- actualSuccessfulCompletionsCount.incrementAndGet();
-
- return completedFuture(++v);
- });
- }
-
- AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-
- vv.whenComplete((t, v, e) -> exceptionRef.set(e));
-
- vv.complete(0L);
-
- assertThrowsWithCause(() -> vv.get(0L).join(),
IgniteInternalException.class);
-
- assertThat(exceptionRef.get().getMessage(),
containsString(exceptionMsg));
- }
-
- /**
- * Test with multiple versioned values and asynchronous completion.
+ * Test {@link VersionedValue#whenComplete}.
*/
@Test
- public void testAsyncMultiVv() {
- final String registryName = "Registry";
- final String assignmentName = "Assignment";
- final String tableName = "T1_";
-
- VersionedValue<Map<UUID, String>> tablesVv = new VersionedValue<>(f ->
{}, HashMap::new);
- VersionedValue<Map<UUID, String>> schemasVv = new
VersionedValue<>(REGISTER, HashMap::new);
- VersionedValue<Map<UUID, String>> assignmentsVv = new
VersionedValue<>(REGISTER, HashMap::new);
+ public void testWhenComplete() {
Review Comment:
If there are many repeated scenarios, I'd recommend extracting them into a
base class
##########
modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Versioned Value flavor that accumulates updates posted by the {@link
#update} method and "commits" them when {@link #complete} or
+ * {@link #completeExceptionally} are called or a storage revision is updated
(see constructors for details).
+ */
+public class IncrementalVersionedValue<T> extends AbstractVersionedValue<T> {
+ /** Update mutex. */
+ private final Object updateMutex = new Object();
+
+ /**
+ * Token that was used with the most recent {@link #update} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long expectedToken = -1;
+
+ /**
+ * Token that was used with the most recent {@link #completeInternal} call.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private long lastCompleteToken = -1;
+
+ /**
+ * Future that will be completed after all updates over the value in
context of current causality token will be performed. This
+ * {@code updaterFuture} is {@code null} if no updates in context of
current causality token have been initiated. See
+ * {@link #update(long, BiFunction)}.
+ *
+ * <p>Multi-threaded access is guarded by {@link #updateMutex}.
+ */
+ private CompletableFuture<T> updaterFuture = completedFuture(getDefault());
+
+ /**
+ * Constructor.
+ *
+ * @param observableRevisionUpdater A closure intended to connect this
VersionedValue with a revision updater, that this
+ * VersionedValue should be able to listen to, for receiving
storage revision updates. This closure is called once on a
+ * construction of this VersionedValue and accepts a {@code
Function<Long, CompletableFuture<?>>} that should be called on every
+ * update of storage revision as a listener.
+ * @param maxHistorySize Size of the history of changes to store,
including last applied token.
+ * @param defaultValueSupplier Supplier of the default value, that is used
on {@link #update(long, BiFunction)} to evaluate the
+ * default value if the value is not initialized yet. It is not
guaranteed to execute only once.
+ */
+ public IncrementalVersionedValue(
+ @Nullable Consumer<Function<Long, CompletableFuture<?>>>
observableRevisionUpdater,
Review Comment:
Anyway, having this value in the constructor looks completely backwards to
me.
Can't we have this subscription somewhere outside?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]