[FLINK-4000] [RabbitMQ] Fix for checkpoint state restore at MessageAcknowledgingSourceBase
As says documentation for MessageAcknowledgingSourceBase.restoreState() This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open(). So current implementation 1. Fails on restoreState with NullPointerException, jobs fail to restart. 2. Does not restore anything because following open erases all checkpoint data immediately. 3. As consequence, violates exactly once rule because processed but not acknowledged list erased. Proposed change fixes that. This closes #2062 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae679bb2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae679bb2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae679bb2 Branch: refs/heads/master Commit: ae679bb2aa1e0e239770605e049709fbc6b9962c Parents: 65ee28c Author: Alexey Savartsov <asavart...@gmail.com> Authored: Thu Jun 2 02:23:53 2016 +0300 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Jun 8 15:17:10 2016 +0200 ---------------------------------------------------------------------- .../api/functions/source/MessageAcknowledgingSourceBase.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ae679bb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index d3cbfb6..9b2c4ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -127,8 +127,10 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> @Override public void open(Configuration parameters) throws Exception { idsForCurrentCheckpoint = new ArrayList<>(64); - pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep); - idsProcessedButNotAcknowledged = new HashSet<>(); + if (pendingCheckpoints == null) + pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep); + if (idsProcessedButNotAcknowledged == null) + idsProcessedButNotAcknowledged = new HashSet<>(); } @Override @@ -177,6 +179,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> @Override public void restoreState(SerializedCheckpointData[] state) throws Exception { + idsProcessedButNotAcknowledged = new HashSet<>(); pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); // build a set which contains all processed ids. It may be used to check if we have // already processed an incoming message.