[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/159f51b2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/159f51b2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/159f51b2 Branch: refs/heads/release-1.2 Commit: 159f51b23dfdf2b6ccba728dbbe4c517ca532dbc Parents: 4697b97 Author: Stephan Ewen <se...@apache.org> Authored: Mon Jan 23 14:55:48 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Jan 23 19:16:03 2017 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/AllWindowedStream.java | 12 ++++++------ .../flink/streaming/api/datastream/WindowedStream.java | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 0f0e947..6c57391 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -53,6 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A {@code AllWindowedStream} represents a data stream where the stream of * elements is split into windows based on a @@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> { */ @PublicEvolving public AllWindowedStream<T, W> allowedLateness(Time lateness) { - long millis = lateness.toMilliseconds(); - if (millis < 0) { - throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (windowAssigner.isEventTime()) { - this.allowedLateness = millis; - } + final long millis = lateness.toMilliseconds(); + checkArgument(millis >= 0, "The allowed lateness cannot be negative."); + + this.allowedLateness = millis; return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 2df3621..b20e67a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -63,6 +63,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A {@code WindowedStream} represents a data stream where elements are grouped by * key, and for each key, the stream of elements is split into windows based on a @@ -140,12 +142,10 @@ public class WindowedStream<T, K, W extends Window> { */ @PublicEvolving public WindowedStream<T, K, W> allowedLateness(Time lateness) { - long millis = lateness.toMilliseconds(); - if (millis < 0) { - throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (windowAssigner.isEventTime()) { - this.allowedLateness = millis; - } + final long millis = lateness.toMilliseconds(); + checkArgument(millis >= 0, "The allowed lateness cannot be negative."); + + this.allowedLateness = millis; return this; }