[FLINK-5762] [runtime] Protect initializeState() and open() by the same lock
This closes #3291 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a91b6ff0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a91b6ff0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a91b6ff0 Branch: refs/heads/master Commit: a91b6ff05d8af870ad076f9bf0fc17886787bc46 Parents: 663c1e3 Author: kl0u <kklou...@gmail.com> Authored: Thu Feb 9 16:02:27 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 14 15:32:43 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/streaming/runtime/tasks/StreamTask.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a91b6ff0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2676b64..3781cb6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -244,12 +244,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // -------- Invoke -------- LOG.debug("Invoking {}", getName()); - // first order of business is to give operators their state - initializeState(); - // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { + + // both the following operations are protected by the lock + // so that we avoid race conditions in the case that initializeState() + // registers a timer, that fires before the open() is called. + + initializeState(); openAllOperators(); }