[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/159f51b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/159f51b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/159f51b2

Branch: refs/heads/release-1.2
Commit: 159f51b23dfdf2b6ccba728dbbe4c517ca532dbc
Parents: 4697b97
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 14:55:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 19:16:03 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/AllWindowedStream.java     | 12 ++++++------
 .../flink/streaming/api/datastream/WindowedStream.java  | 12 ++++++------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0f0e947..6c57391 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -53,6 +53,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
  * elements is split into windows based on a
@@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> {
         */
        @PublicEvolving
        public AllWindowedStream<T, W> allowedLateness(Time lateness) {
-               long millis = lateness.toMilliseconds();
-               if (millis < 0) {
-                       throw new IllegalArgumentException("The allowed 
lateness cannot be negative.");
-               } else if (windowAssigner.isEventTime()) {
-                       this.allowedLateness = millis;
-               }
+               final long millis = lateness.toMilliseconds();
+               checkArgument(millis >= 0, "The allowed lateness cannot be 
negative.");
+
+               this.allowedLateness = millis;
                return this;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2df3621..b20e67a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -63,6 +63,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code WindowedStream} represents a data stream where elements are 
grouped by
  * key, and for each key, the stream of elements is split into windows based 
on a
@@ -140,12 +142,10 @@ public class WindowedStream<T, K, W extends Window> {
         */
        @PublicEvolving
        public WindowedStream<T, K, W> allowedLateness(Time lateness) {
-               long millis = lateness.toMilliseconds();
-               if (millis < 0) {
-                       throw new IllegalArgumentException("The allowed 
lateness cannot be negative.");
-               } else if (windowAssigner.isEventTime()) {
-                       this.allowedLateness = millis;
-               }
+               final long millis = lateness.toMilliseconds();
+               checkArgument(millis >= 0, "The allowed lateness cannot be 
negative.");
+
+               this.allowedLateness = millis;
                return this;
        }
 

Reply via email to