This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 8a73d68  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
8a73d68 is described below

commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Tue Dec 10 10:03:49 2019 +0800

    [FLINK-14951][tests] Harden the thread safety of State TTL backend tests
---
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++++++++++++++--------
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +++++--------
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
index 24bc9dc..c2e66f1 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -19,12 +19,15 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A stub implementation of a {@link TtlTimeProvider} which guarantees that
  * processing time increases monotonically.
@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
        private static final Object lock = new Object();
 
        @GuardedBy("lock")
-       static long freeze() {
+       static <T, E extends Throwable> T 
doWithFrozenTime(FunctionWithException<Long, T, E> action) throws E {
                synchronized (lock) {
-                       if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
-                               timeIsFrozen = true;
-                               return getCurrentTimestamp();
-                       } else {
-                               return lastReturnedProcessingTime;
-                       }
+                       final long timestampBeforeUpdate = freeze();
+                       T result = action.apply(timestampBeforeUpdate);
+                       final long timestampAfterUpdate = unfreezeTime();
+
+                       checkState(timestampAfterUpdate == 
timestampBeforeUpdate,
+                               "Timestamps before and after the update do not 
match.");
+                       return result;
+               }
+       }
+
+       private static long freeze() {
+               if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
+                       timeIsFrozen = true;
+                       return getCurrentTimestamp();
+               } else {
+                       return lastReturnedProcessingTime;
                }
        }
 
@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
                return lastReturnedProcessingTime;
        }
 
-       @GuardedBy("lock")
-       static long unfreezeTime() {
-               synchronized (lock) {
-                       timeIsFrozen = false;
-                       return lastReturnedProcessingTime;
-               }
+       private static long unfreezeTime() {
+               timeIsFrozen = false;
+               return lastReturnedProcessingTime;
        }
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 94e9dbd..ed69171 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Update state with TTL for each verifier.
@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends 
RichFlatMapFunction<TtlStateUpdate, String
                        TtlStateVerifier<?, ?> verifier,
                        Object update) throws Exception {
 
-               final long timestampBeforeUpdate = 
MonotonicTTLTimeProvider.freeze();
-               State state = states.get(verifier.getId());
-               Object valueBeforeUpdate = verifier.get(state);
-               verifier.update(state, update);
-               Object updatedValue = verifier.get(state);
-               final long timestampAfterUpdate = 
MonotonicTTLTimeProvider.unfreezeTime();
-
-               checkState(
-                               timestampAfterUpdate == timestampBeforeUpdate,
-                               "Timestamps before and after the update do not 
match."
-               );
-
-               return new TtlUpdateContext<>(valueBeforeUpdate, update, 
updatedValue, timestampAfterUpdate);
+               return 
MonotonicTTLTimeProvider.doWithFrozenTime(frozenTimestamp -> {
+                       State state = states.get(verifier.getId());
+                       Object valueBeforeUpdate = verifier.get(state);
+                       verifier.update(state, update);
+                       Object updatedValue = verifier.get(state);
+                       return new TtlUpdateContext<>(valueBeforeUpdate, 
update, updatedValue, frozenTimestamp);
+               });
        }
 
        @Override

Reply via email to