[FLINK-4000] [RabbitMQ] Fix for checkpoint state restore at 
MessageAcknowledgingSourceBase

As says documentation for MessageAcknowledgingSourceBase.restoreState()

This method is invoked when a function is executed as part of a recovery run. 
Note that restoreState() is called before open().

So current implementation

1. Fails on restoreState with NullPointerException, jobs fail to restart.
2. Does not restore anything because following open erases all checkpoint data 
immediately.
3. As consequence, violates exactly once rule because processed but not 
acknowledged list erased.

Proposed change fixes that.

This closes #2062


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae679bb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae679bb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae679bb2

Branch: refs/heads/master
Commit: ae679bb2aa1e0e239770605e049709fbc6b9962c
Parents: 65ee28c
Author: Alexey Savartsov <asavart...@gmail.com>
Authored: Thu Jun 2 02:23:53 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../api/functions/source/MessageAcknowledgingSourceBase.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae679bb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index d3cbfb6..9b2c4ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -127,8 +127,10 @@ public abstract class MessageAcknowledgingSourceBase<Type, 
UId>
        @Override
        public void open(Configuration parameters) throws Exception {
                idsForCurrentCheckpoint = new ArrayList<>(64);
-               pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep);
-               idsProcessedButNotAcknowledged = new HashSet<>();
+               if (pendingCheckpoints == null)
+                       pendingCheckpoints = new 
ArrayDeque<>(numCheckpointsToKeep);
+               if (idsProcessedButNotAcknowledged == null)
+                       idsProcessedButNotAcknowledged = new HashSet<>();
        }
 
        @Override
@@ -177,6 +179,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, 
UId>
 
        @Override
        public void restoreState(SerializedCheckpointData[] state) throws 
Exception {
+               idsProcessedButNotAcknowledged = new HashSet<>();
                pendingCheckpoints = SerializedCheckpointData.toDeque(state, 
idSerializer);
                // build a set which contains all processed ids. It may be used 
to check if we have
                // already processed an incoming message.

Reply via email to