[hotfix] Improve handling of Window Trigger results

This enhances the TriggerResult enum with methods isFire() and isPurge()
that simplify the logic in WindowOperator.processTriggerResult().

Also, the operator now keeps track of the current watermark and fires
immediately if a trigger registers an event-time callback for a
timestamp that lies in the past. For this the TriggerResult now as
method merge() that allows to merge to TriggerResultS.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6969377
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6969377
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6969377

Branch: refs/heads/master
Commit: e696937788c0fcba78dcdf820a5ebb70f8086710
Parents: e18cdd0
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Oct 23 11:31:13 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Oct 23 15:49:50 2015 +0200

----------------------------------------------------------------------
 .../api/windowing/triggers/Trigger.java         |  45 ++++++-
 .../windowing/NonKeyedWindowOperator.java       | 107 +++++++++++-----
 .../operators/windowing/WindowOperator.java     | 127 +++++++++++++------
 .../flink/streaming/util/TestHarnessUtil.java   |  33 ++++-
 4 files changed, 236 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 15ccb33..ee6a279 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -77,10 +77,51 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * <p>
         * On {@code FIRE} the pane is evaluated and results are emitted. The 
contents of the window
         * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the 
contents of the pane
-        * are purged. On {@code CONTINUE} nothing happens, processing 
continues.
+        * are purged. On {@code CONTINUE} nothing happens, processing 
continues. On {@code PURGE}
+        * the contents of the window are discarded and now result is emitted 
for the window.
         */
        enum TriggerResult {
-               CONTINUE, FIRE_AND_PURGE, FIRE
+               CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, 
false), PURGE(false, true);
+
+               private final boolean fire;
+               private final boolean purge;
+
+               TriggerResult(boolean fire, boolean purge) {
+                       this.purge = purge;
+                       this.fire = fire;
+               }
+
+               public boolean isFire() {
+                       return fire;
+               }
+
+               public boolean isPurge() {
+                       return purge;
+               }
+
+               /**
+                * Merges two {@code TriggerResults}. This specifies what 
should happen if we have
+                * two results from a Trigger, for example as a result from
+                * {@link #onElement(Object, long, Window, TriggerContext)} and
+                * {@link #onEventTime(long, Window, TriggerContext)}.
+                *
+                * <p>
+                * For example, if one result says {@code CONTINUE} while the 
other says {@code FIRE}
+                * then {@code FIRE} is the combined result;
+                */
+               public static TriggerResult merge(TriggerResult a, 
TriggerResult b) {
+                       if (a.purge || b.purge) {
+                               if (a.fire || b.fire) {
+                                       return FIRE_AND_PURGE;
+                               } else {
+                                       return PURGE;
+                               }
+                       } else if (a.fire || b.fire) {
+                               return FIRE;
+                       } else {
+                               return CONTINUE;
+                       }
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 2209d5e..d12a930 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -122,6 +123,12 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
+       /**
+        * To keep track of the current watermark so that we can immediately 
fire if a trigger
+        * registers an event time callback for a timestamp that lies in the 
past.
+        */
+       protected transient long currentWatermark = -1L;
+
        // 
------------------------------------------------------------------------
        // State that needs to be checkpointed
        // 
------------------------------------------------------------------------
@@ -152,6 +159,11 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+               currentWatermark = -1;
+       }
+
        @Override
        @SuppressWarnings("unchecked")
        public final void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
@@ -232,7 +244,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                windows.put(window, context);
                        }
                        context.windowBuffer.storeElement(element);
-                       Trigger.TriggerResult triggerResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
                        processTriggerResult(triggerResult, window);
                }
        }
@@ -249,53 +261,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        private void processTriggerResult(Trigger.TriggerResult triggerResult, 
W window) throws Exception {
-               switch (triggerResult) {
-                       case FIRE: {
-                               Context context = windows.get(window);
-                               if (context == null) {
-                                       LOG.debug("Window {} already gone.", 
window);
-                                       return;
-                               }
-
-
-                               emitWindow(context);
-                               break;
-                       }
-
-                       case FIRE_AND_PURGE: {
-                               Context context = windows.remove(window);
-                               if (context == null) {
-                                       LOG.debug("Window {} already gone.", 
window);
-                                       return;
-                               }
+               if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+                       // do nothing
+                       return;
+               }
+               Context context;
 
-                               emitWindow(context);
-                               break;
-                       }
+               if (triggerResult.isPurge()) {
+                       context = windows.remove(window);
+               } else {
+                       context = windows.get(window);
+               }
+               if (context == null) {
+                       LOG.debug("Window {} already gone.", window);
+                       return;
+               }
 
-                       case CONTINUE:
-                               // ingore
+               if (triggerResult.isFire()) {
+                       emitWindow(context);
                }
        }
 
        @Override
        public final void processWatermark(Watermark mark) throws Exception {
                Set<Long> toRemove = new HashSet<>();
+               Set<Context> toTrigger = new HashSet<>();
 
+               // we cannot call the Trigger in here because trigger methods 
might register new triggers.
+               // that would lead to concurrent modification errors.
                for (Map.Entry<Long, Set<Context>> triggers: 
watermarkTimers.entrySet()) {
                        if (triggers.getKey() <= mark.getTimestamp()) {
                                for (Context context: triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
context.onEventTime(triggers.getKey());
-                                       processTriggerResult(triggerResult, 
context.window);
+                                       toTrigger.add(context);
                                }
                                toRemove.add(triggers.getKey());
                        }
                }
 
+               for (Context context: toTrigger) {
+                       // double check the time. it can happen that the 
trigger registers a new timer,
+                       // in that case the entry is left in the 
watermarkTimers set for performance reasons.
+                       // We have to check here whether the entry in the set 
still reflects the
+                       // currently set timer in the Context.
+                       if (context.watermarkTimer <= mark.getTimestamp()) {
+                               Trigger.TriggerResult triggerResult = 
context.onEventTime(context.watermarkTimer);
+                               processTriggerResult(triggerResult, 
context.window);
+                       }
+               }
+
                for (Long l: toRemove) {
                        watermarkTimers.remove(l);
                }
                output.emitWatermark(mark);
+
+               this.currentWatermark = mark.getTimestamp();
        }
 
        @Override
@@ -318,8 +337,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        /**
-        * A context object that is given to {@code Trigger} functions to allow 
them to register
-        * timer/watermark callbacks.
+        * The {@code Context} is responsible for keeping track of the state of 
one pane.
+        *
+        * <p>
+        * A pane is the bucket of elements that have the same key (assigned by 
the
+        * {@link org.apache.flink.api.java.functions.KeySelector}) and same 
{@link Window}. An element can
+        * be in multiple panes of it was assigned to multiple windows by the
+        * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes 
all
+        * have their own instance of the {@code Trigger}.
         */
        protected class Context implements Trigger.TriggerContext {
                protected W window;
@@ -435,8 +460,20 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        triggers.add(this);
                }
 
+               public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
+                       Trigger.TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+                       if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
+                               // fire now and don't wait for the next 
watermark update
+                               Trigger.TriggerResult onEventTimeResult = 
onEventTime(watermarkTimer);
+                               return 
Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+                       } else {
+                               return onElementResult;
+                       }
+               }
+
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
+                               processingTimeTimer = -1;
                                return trigger.onProcessingTime(time, window, 
this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
@@ -445,7 +482,17 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onEventTime(time, window, this);
+                               watermarkTimer = -1;
+                               Trigger.TriggerResult firstTriggerResult = 
trigger.onEventTime(time, window, this);
+
+                               if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
+                                       // fire now and don't wait for the next 
watermark update
+                                       Trigger.TriggerResult 
secondTriggerResult = onEventTime(watermarkTimer);
+                                       return 
Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+                               } else {
+                                       return firstTriggerResult;
+                               }
+
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index e8e001d..c39679f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -147,6 +148,12 @@ public class WindowOperator<K, IN, OUT, W extends Window>
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
+       /**
+        * To keep track of the current watermark so that we can immediately 
fire if a trigger
+        * registers an event time callback for a timestamp that lies in the 
past.
+        */
+       protected transient long currentWatermark = -1L;
+
        // 
------------------------------------------------------------------------
        // State that needs to be checkpointed
        // 
------------------------------------------------------------------------
@@ -181,6 +188,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+               currentWatermark = -1;
+       }
+
        @Override
        @SuppressWarnings("unchecked")
        public final void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
@@ -276,8 +288,9 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                                context = new Context(key, window, 
windowBuffer);
                                keyWindows.put(window, context);
                        }
+
                        context.windowBuffer.storeElement(element);
-                       Trigger.TriggerResult triggerResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
                        processTriggerResult(triggerResult, key, window);
                }
        }
@@ -297,66 +310,68 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        private void processTriggerResult(Trigger.TriggerResult triggerResult, 
K key, W window) throws Exception {
-               switch (triggerResult) {
-                       case FIRE: {
-                               Map<W, Context> keyWindows = windows.get(key);
-                               if (keyWindows == null) {
-                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
-                                       return;
-                               }
-                               Context context = keyWindows.get(window);
-                               if (context == null) {
-                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
-                                       return;
-                               }
-
-
-                               emitWindow(context);
-                               break;
-                       }
-
-                       case FIRE_AND_PURGE: {
-                               Map<W, Context> keyWindows = windows.get(key);
-                               if (keyWindows == null) {
-                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
-                                       return;
-                               }
-                               Context context = keyWindows.remove(window);
-                               if (context == null) {
-                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
-                                       return;
-                               }
-                               if (keyWindows.isEmpty()) {
-                                       windows.remove(key);
-                               }
+               if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+                       // do nothing
+                       return;
+               }
+               Context context;
+               Map<W, Context> keyWindows = windows.get(key);
+               if (keyWindows == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
 
-                               emitWindow(context);
-                               break;
+               if (triggerResult.isPurge()) {
+                       context = keyWindows.remove(window);
+                       if (keyWindows.isEmpty()) {
+                               windows.remove(key);
                        }
+               } else {
+                       context = keyWindows.get(window);
+               }
+               if (context == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
 
-                       case CONTINUE:
-                               // ingore
+               if (triggerResult.isFire()) {
+                       emitWindow(context);
                }
        }
 
        @Override
        public final void processWatermark(Watermark mark) throws Exception {
                Set<Long> toRemove = new HashSet<>();
+               Set<Context> toTrigger = new HashSet<>();
 
+               // we cannot call the Trigger in here because trigger methods 
might register new triggers.
+               // that would lead to concurrent modification errors.
                for (Map.Entry<Long, Set<Context>> triggers: 
watermarkTimers.entrySet()) {
                        if (triggers.getKey() <= mark.getTimestamp()) {
                                for (Context context: triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
context.onEventTime(triggers.getKey());
-                                       processTriggerResult(triggerResult, 
context.key, context.window);
+                                       toTrigger.add(context);
                                }
                                toRemove.add(triggers.getKey());
                        }
                }
 
+               for (Context context: toTrigger) {
+                       // double check the time. it can happen that the 
trigger registers a new timer,
+                       // in that case the entry is left in the 
watermarkTimers set for performance reasons.
+                       // We have to check here whether the entry in the set 
still reflects the
+                       // currently set timer in the Context.
+                       if (context.watermarkTimer <= mark.getTimestamp()) {
+                               Trigger.TriggerResult triggerResult = 
context.onEventTime(context.watermarkTimer);
+                               processTriggerResult(triggerResult, 
context.key, context.window);
+                       }
+               }
+
                for (Long l: toRemove) {
                        watermarkTimers.remove(l);
                }
                output.emitWatermark(mark);
+
+               this.currentWatermark = mark.getTimestamp();
        }
 
        @Override
@@ -380,8 +395,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        /**
-        * A context object that is given to {@code Trigger} functions to allow 
them to register
-        * timer/watermark callbacks.
+        * The {@code Context} is responsible for keeping track of the state of 
one pane.
+        *
+        * <p>
+        * A pane is the bucket of elements that have the same key (assigned by 
the
+        * {@link org.apache.flink.api.java.functions.KeySelector}) and same 
{@link Window}. An element can
+        * be in multiple panes of it was assigned to multiple windows by the
+        * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes 
all
+        * have their own instance of the {@code Trigger}.
         */
        protected class Context implements Trigger.TriggerContext {
                protected K key;
@@ -508,8 +529,20 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                        triggers.add(this);
                }
 
+               public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
+                       Trigger.TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+                       if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
+                               // fire now and don't wait for the next 
watermark update
+                               Trigger.TriggerResult onEventTimeResult = 
onEventTime(watermarkTimer);
+                               return 
Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+                       } else {
+                               return onElementResult;
+                       }
+               }
+
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
+                               processingTimeTimer = -1;
                                return trigger.onProcessingTime(time, window, 
this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
@@ -518,7 +551,17 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onEventTime(time, window, this);
+                               watermarkTimer = -1;
+                               Trigger.TriggerResult firstTriggerResult = 
trigger.onEventTime(time, window, this);
+
+                               if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
+                                       // fire now and don't wait for the next 
watermark update
+                                       Trigger.TriggerResult 
secondTriggerResult = onEventTime(watermarkTimer);
+                                       return 
Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+                               } else {
+                                       return firstTriggerResult;
+                               }
+
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 889ae37..9d1e674 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -72,8 +75,34 @@ public class TestHarnessUtil {
         * Compare the two queues containing operator/task output by converting 
them to an array first.
         */
        public static void assertOutputEqualsSorted(String message, 
Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
-               Object[] sortedExpected = expected.toArray();
-               Object[] sortedActual = actual.toArray();
+               // first, compare only watermarks, their position should be 
deterministic
+               Iterator<Object> exIt = expected.iterator();
+               Iterator<Object> actIt = actual.iterator();
+               while (exIt.hasNext()) {
+                       Object nextEx = exIt.next();
+                       Object nextAct = actIt.next();
+                       if (nextEx instanceof Watermark) {
+                               Assert.assertEquals(nextEx, nextAct);
+                       }
+               }
+
+               List<Object> expectedRecords = new ArrayList<>();
+               List<Object> actualRecords = new ArrayList<>();
+
+               for (Object ex: expected) {
+                       if (ex instanceof StreamRecord) {
+                               expectedRecords.add(ex);
+                       }
+               }
+
+               for (Object act: actual) {
+                       if (act instanceof StreamRecord) {
+                               actualRecords.add(act);
+                       }
+               }
+
+               Object[] sortedExpected = expectedRecords.toArray();
+               Object[] sortedActual = actualRecords.toArray();
 
                Arrays.sort(sortedExpected, comparator);
                Arrays.sort(sortedActual, comparator);

Reply via email to