[FLINK-4000] [RabbitMQ] Style cleanups in MessageAcknowledgingSourceBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6afb2b00 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6afb2b00 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6afb2b00 Branch: refs/heads/master Commit: 6afb2b00f5438c176fe0579f632757722014e696 Parents: ae679bb Author: Stephan Ewen <se...@apache.org> Authored: Tue Jun 7 19:20:03 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Jun 8 15:17:10 2016 +0200 ---------------------------------------------------------------------- .../functions/source/MessageAcknowledgingSourceBase.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6afb2b00/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 9b2c4ac..5c1b94e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -102,8 +102,6 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> */ private transient Set<UId> idsProcessedButNotAcknowledged; - protected int numCheckpointsToKeep = 10; - // ------------------------------------------------------------------------ /** @@ -127,10 +125,12 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> @Override public void open(Configuration parameters) throws Exception { idsForCurrentCheckpoint = new ArrayList<>(64); - if (pendingCheckpoints == null) - pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep); - if (idsProcessedButNotAcknowledged == null) + if (pendingCheckpoints == null) { + pendingCheckpoints = new ArrayDeque<>(); + } + if (idsProcessedButNotAcknowledged == null) { idsProcessedButNotAcknowledged = new HashSet<>(); + } } @Override