[FLINK-4000] [RabbitMQ] Style cleanups in MessageAcknowledgingSourceBase

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6afb2b00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6afb2b00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6afb2b00

Branch: refs/heads/master
Commit: 6afb2b00f5438c176fe0579f632757722014e696
Parents: ae679bb
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 7 19:20:03 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../functions/source/MessageAcknowledgingSourceBase.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6afb2b00/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 9b2c4ac..5c1b94e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -102,8 +102,6 @@ public abstract class MessageAcknowledgingSourceBase<Type, 
UId>
         */
        private transient Set<UId> idsProcessedButNotAcknowledged;
 
-       protected int numCheckpointsToKeep = 10;
-
        // 
------------------------------------------------------------------------
 
        /**
@@ -127,10 +125,12 @@ public abstract class 
MessageAcknowledgingSourceBase<Type, UId>
        @Override
        public void open(Configuration parameters) throws Exception {
                idsForCurrentCheckpoint = new ArrayList<>(64);
-               if (pendingCheckpoints == null)
-                       pendingCheckpoints = new 
ArrayDeque<>(numCheckpointsToKeep);
-               if (idsProcessedButNotAcknowledged == null)
+               if (pendingCheckpoints == null) {
+                       pendingCheckpoints = new ArrayDeque<>();
+               }
+               if (idsProcessedButNotAcknowledged == null) {
                        idsProcessedButNotAcknowledged = new HashSet<>();
+               }
        }
 
        @Override

Reply via email to