Repository: incubator-beam
Updated Branches:
  refs/heads/master 164676bc7 -> fba1259b4


Drop elements in closed windows before mapping window

Previously, the sequence was:

1. Map a window to a representative of its equivalence class
   according to merging.
2. Drop the element if that window was closed.

But this crashes if the original window was already closed.

The new sequence is reversed. This is safe, because it is not possible
to map to a representative which is closed, as it is no longer a
candidate for merges.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27c6c795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27c6c795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27c6c795

Branch: refs/heads/master
Commit: 27c6c795271e7a927ed0d07679ce9d6de300c38f
Parents: 96e286f
Author: Mark Shields <markshie...@google.com>
Authored: Thu Mar 31 10:57:55 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Mar 31 19:17:42 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 29 ++++++++-------
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27c6c795/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index d62bcc9..2415dab 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -438,8 +438,22 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     for (BoundedWindow untypedWindow : value.getWindows()) {
       @SuppressWarnings("unchecked")
       W window = (W) untypedWindow;
+
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(window, StateStyle.DIRECT);
+      if (triggerRunner.isClosed(directContext.state())) {
+        // This window has already been closed.
+        droppedDueToClosedWindow.addValue(1L);
+        WindowTracing.debug(
+            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; 
window:{} "
+            + "since window is no longer active at inputWatermark:{}; 
outputWatermark:{}",
+            value.getTimestamp(), key, window, 
timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        continue;
+      }
+
       W active = activeWindows.representative(window);
-      Preconditions.checkState(active != null, "Window %s should have been 
added", window);
+      Preconditions.checkState(active != null, "Window %s has no 
representative", window);
       windows.add(active);
     }
 
@@ -450,24 +464,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       triggerRunner.prefetchForValue(window, directContext.state());
     }
 
-    // Process the element for each (representative) window it belongs to.
+    // Process the element for each (representative, not closed) window it 
belongs to.
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
 
-      // Check to see if the triggerRunner thinks the window is closed. If so, 
drop that window.
-      if (triggerRunner.isClosed(directContext.state())) {
-        droppedDueToClosedWindow.addValue(1L);
-        WindowTracing.debug(
-            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; 
window:{} "
-            + "since window is no longer active at inputWatermark:{}; 
outputWatermark:{}",
-            value.getTimestamp(), key, window, 
timerInternals.currentInputWatermarkTime(),
-            timerInternals.currentOutputWatermarkTime());
-        continue;
-      }
-
       nonEmptyPanes.recordContent(renamedContext.state());
 
       // Make sure we've scheduled the end-of-window or garbage collection 
timer for this window.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27c6c795/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index 10b886b..b58e360 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -725,6 +725,44 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * If a later event tries to reuse an earlier session window which has been 
closed, we
+   * should reject that element and not fail due to the window no longer 
having a representative.
+   */
+  @Test
+  public void testMergingWithReusedWindow() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        
ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), 
mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, 
Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // One elements in one session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 
11), gc at 21.
+
+    // Close the trigger, but the gargbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(15));
+
+    // Another element in the same session window.
+    // Should be discarded with 'window closed'.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 
11), gc at 21.
+
+    // Now the garbage collection timer will fire, finding the trigger already 
closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+               isSingleWindowedValue(containsInAnyOrder(1),
+                                     1, // timestamp
+                                     1, // window start
+                                     11)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
    * Tests that when data is assigned to multiple windows but some of those 
windows have
    * had their triggers finish, then the data is dropped and counted 
accurately.
    */

Reply via email to