This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 8a73d68 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests 8a73d68 is described below commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Tue Dec 10 10:03:49 2019 +0800 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests --- .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++++++++++++++-------- .../streaming/tests/TtlVerifyUpdateFunction.java | 21 +++++-------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java index 24bc9dc..c2e66f1 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -19,12 +19,15 @@ package org.apache.flink.streaming.tests; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkState; + /** * A stub implementation of a {@link TtlTimeProvider} which guarantees that * processing time increases monotonically. @@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { private static final Object lock = new Object(); @GuardedBy("lock") - static long freeze() { + static <T, E extends Throwable> T doWithFrozenTime(FunctionWithException<Long, T, E> action) throws E { synchronized (lock) { - if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { - timeIsFrozen = true; - return getCurrentTimestamp(); - } else { - return lastReturnedProcessingTime; - } + final long timestampBeforeUpdate = freeze(); + T result = action.apply(timestampBeforeUpdate); + final long timestampAfterUpdate = unfreezeTime(); + + checkState(timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match."); + return result; + } + } + + private static long freeze() { + if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { + timeIsFrozen = true; + return getCurrentTimestamp(); + } else { + return lastReturnedProcessingTime; } } @@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { return lastReturnedProcessingTime; } - @GuardedBy("lock") - static long unfreezeTime() { - synchronized (lock) { - timeIsFrozen = false; - return lastReturnedProcessingTime; - } + private static long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 94e9dbd..ed69171 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -47,7 +47,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Update state with TTL for each verifier. @@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String TtlStateVerifier<?, ?> verifier, Object update) throws Exception { - final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze(); - State state = states.get(verifier.getId()); - Object valueBeforeUpdate = verifier.get(state); - verifier.update(state, update); - Object updatedValue = verifier.get(state); - final long timestampAfterUpdate = MonotonicTTLTimeProvider.unfreezeTime(); - - checkState( - timestampAfterUpdate == timestampBeforeUpdate, - "Timestamps before and after the update do not match." - ); - - return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, timestampAfterUpdate); + return MonotonicTTLTimeProvider.doWithFrozenTime(frozenTimestamp -> { + State state = states.get(verifier.getId()); + Object valueBeforeUpdate = verifier.get(state); + verifier.update(state, update); + Object updatedValue = verifier.get(state); + return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, frozenTimestamp); + }); } @Override