vldpyatkov commented on code in PR #2029:
URL: https://github.com/apache/ignite-3/pull/2029#discussion_r1187499121
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -102,6 +109,17 @@ public void update(T newValue) {
}
}
+ /**
+ * Updates the internal state, if it is lower than {@code newValue} and
completes all futures waiting for {@code newValue}
+ * that had been created for corresponding values that are lower than the
given one.
+ *
+ * @param newValue New value.
+ * @throws TrackerClosedException if the tracker is closed.
+ */
+ public void update(T newValue) {
Review Comment:
This method is looking risky, because you don't know exactly the type of
value.
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -161,8 +171,38 @@ public void close() {
TrackerClosedException trackerClosedException = new
TrackerClosedException();
- valueFutures.values().forEach(future ->
future.completeExceptionally(trackerClosedException));
+ completeWaitersOnClose(trackerClosedException);
valueFutures.clear();
}
+
+ protected void completeWaitersOnUpdate(T newValue, @Nullable R
futureResult) {
+ ConcurrentNavigableMap<T, CompletableFuture<R>> smallerFutures =
valueFutures.headMap(newValue, true);
+
+ smallerFutures.forEach((k, f) -> f.complete(futureResult));
+
+ smallerFutures.clear();
Review Comment:
Why does it require to clear?
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -22,34 +22,39 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.util.Comparator;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
/**
* Tracker that stores comparable value internally, this value can grow when
{@link #update(Comparable)} method is called. The tracker gives
* ability to wait for certain value, see {@link #waitFor(Comparable)}.
*/
-public class PendingComparableValuesTracker<T extends Comparable<T>>
implements ManuallyCloseable {
+public class PendingComparableValuesTracker<T extends Comparable<T>, R>
implements ManuallyCloseable {
private static final VarHandle CURRENT;
private static final VarHandle CLOSE_GUARD;
static {
try {
- CURRENT =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"current", Comparable.class);
+ CURRENT =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"current", Map.Entry.class);
CLOSE_GUARD =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"closeGuard", boolean.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
/** Map of comparable values to corresponding futures. */
- private final ConcurrentSkipListMap<T, CompletableFuture<Void>>
valueFutures = new ConcurrentSkipListMap<>();
+ private final ConcurrentSkipListMap<T, CompletableFuture<R>> valueFutures
= new ConcurrentSkipListMap<>();
- /** Current value. */
- private volatile T current;
+ /** Current value along with associated result. */
+ @SuppressWarnings("FieldMayBeFinal") // Changed through CURRENT VarHandle.
+ private volatile Map.Entry<T, @Nullable R> current;
Review Comment:
As I understand, the current value is the only is used. Because if you try
to get future for the previous T, you will receive the value of this variable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]