[FLINK-5762] [runtime] Protect initializeState() and open() by the same lock

This closes #3291


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a91b6ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a91b6ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a91b6ff0

Branch: refs/heads/master
Commit: a91b6ff05d8af870ad076f9bf0fc17886787bc46
Parents: 663c1e3
Author: kl0u <kklou...@gmail.com>
Authored: Thu Feb 9 16:02:27 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a91b6ff0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2676b64..3781cb6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -244,12 +244,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // -------- Invoke --------
                        LOG.debug("Invoking {}", getName());
 
-                       // first order of business is to give operators their 
state
-                       initializeState();
-
                        // we need to make sure that any triggers scheduled in 
open() cannot be
                        // executed before all operators are opened
                        synchronized (lock) {
+
+                               // both the following operations are protected 
by the lock
+                               // so that we avoid race conditions in the case 
that initializeState()
+                               // registers a timer, that fires before the 
open() is called.
+
+                               initializeState();
                                openAllOperators();
                        }
 

Reply via email to