Repository: beam
Updated Branches:
  refs/heads/master f54072a1b -> 0064fb37a


[BEAM-2314] Add ValidatesRunner test for merging custom windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfa983ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfa983ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfa983ce

Branch: refs/heads/master
Commit: dfa983ce4adb85d211497460254b6a95944ce869
Parents: f54072a
Author: Etienne Chauchot <echauc...@gmail.com>
Authored: Mon May 29 12:05:51 2017 +0200
Committer: Aviem Zur <aviem...@gmail.com>
Committed: Mon Jul 24 14:33:00 2017 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   3 +-
 .../sdk/testing/UsesCustomWindowMerging.java    |  23 +++
 .../sdk/transforms/windowing/WindowTest.java    | 184 +++++++++++++++++++
 3 files changed, 209 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f70204..35e933b 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,8 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream
+                    org.apache.beam.sdk.testing.UsesTestStream,
+                    org.apache.beam.sdk.testing.UsesCustomWindowMerging
                   </excludedGroups>
                   <parallel>none</parallel>
                   <forkCount>1</forkCount>

http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
new file mode 100644
index 0000000..fc40e02
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize custom window merging.
+ */
+public interface UsesCustomWindowMerging {}

http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 65af7a1..5b6d046 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -31,19 +31,30 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
 import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -570,4 +581,177 @@ public class WindowTest implements Serializable {
     assertThat(data, not(hasDisplayItem("trigger")));
     assertThat(data, not(hasDisplayItem("allowedLateness")));
   }
+  @Test
+  @Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
+  public void testMergingCustomWindows() {
+    Instant startInstant = new Instant(0L);
+    List<TimestampedValue<String>> input = new ArrayList<>();
+    input.add(TimestampedValue.of("big", 
startInstant.plus(Duration.standardSeconds(10))));
+    input.add(TimestampedValue.of("small1", 
startInstant.plus(Duration.standardSeconds(20))));
+    //    This one will be outside of bigWindow thus not merged
+    input.add(TimestampedValue.of("small2", 
startInstant.plus(Duration.standardSeconds(39))));
+    PCollection<String> inputCollection = 
pipeline.apply(Create.timestamped(input));
+    PCollection<String> windowedCollection = inputCollection
+        .apply(Window.into(new CustomWindowFn<String>()));
+    PCollection<Long> count = windowedCollection
+        .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
+    // "small1" and "big" elements merged into bigWindow "small2" not merged
+    // because timestamp is not in bigWindow
+    PAssert.that("Wrong number of elements in output collection", 
count).containsInAnyOrder(2L, 1L);
+    pipeline.run();
+  }
+
+  //  This test is usefull because some runners have a special merge 
implementation
+  // for keyed collections
+  @Test
+  @Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
+  public void testMergingCustomWindowsKeyedCollection() {
+    Instant startInstant = new Instant(0L);
+    List<TimestampedValue<KV<Integer, String>>> input = new ArrayList<>();
+    input
+        .add(TimestampedValue.of(KV.of(0, "big"), 
startInstant.plus(Duration.standardSeconds(10))));
+    input.add(
+        TimestampedValue.of(KV.of(1, "small1"), 
startInstant.plus(Duration.standardSeconds(20))));
+    //    This one will be outside of bigWindow thus not merged
+    input.add(
+        TimestampedValue.of(KV.of(2, "small2"), 
startInstant.plus(Duration.standardSeconds(39))));
+    PCollection<KV<Integer, String>> inputCollection = 
pipeline.apply(Create.timestamped(input));
+    PCollection<KV<Integer, String>> windowedCollection = inputCollection
+        .apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
+    PCollection<Long> count = windowedCollection
+        .apply(Combine.globally(Count.<KV<Integer, 
String>>combineFn()).withoutDefaults());
+    // "small1" and "big" elements merged into bigWindow "small2" not merged
+    // because timestamp is not in bigWindow
+    PAssert.that("Wrong number of elements in output collection", 
count).containsInAnyOrder(2L, 1L);
+    pipeline.run();
+  }
+
+  private static class CustomWindow extends IntervalWindow {
+
+    private boolean isBig;
+
+
+    CustomWindow(Instant start, Instant end) {
+      super(start, end);
+      this.isBig = false;
+    }
+
+    CustomWindow(Instant start, Instant end, boolean isBig) {
+      super(start, end);
+      this.isBig = isBig;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      CustomWindow that = (CustomWindow) o;
+      return isBig == that.isBig;
+    }
+
+    @Override public int hashCode() {
+      return Objects.hash(super.hashCode(), isBig);
+    }
+  }
+
+  private static class CustomWindowCoder extends
+      CustomCoder<CustomWindow> {
+
+    private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
+    private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = 
IntervalWindow.getCoder();
+    private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
+
+    public static CustomWindowCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(CustomWindow window, OutputStream outStream)
+        throws IOException {
+      INTERVAL_WINDOW_CODER.encode(window, outStream);
+      VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
+    }
+
+    @Override
+    public CustomWindow decode(InputStream inStream) throws IOException {
+      IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream);
+      boolean isBig = VAR_INT_CODER.decode(inStream) != 0;
+      return new CustomWindow(superWindow.start(), superWindow.end(), isBig);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      INTERVAL_WINDOW_CODER.verifyDeterministic();
+      VAR_INT_CODER.verifyDeterministic();
+    }
+  }
+
+  private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
+
+    @Override public Collection<CustomWindow> assignWindows(AssignContext c) 
throws Exception {
+      String element;
+      // It loses genericity of type T but this is not a big deal for a test.
+      // And it allows to avoid duplicating CustomWindowFn to support 
PCollection<KV>
+      if (c.element() instanceof KV){
+        element = ((KV<Integer, String>) c.element()).getValue();
+      } else {
+        element = (String) c.element();
+      }
+      // put big elements in windows of 30s and small ones in windows of 5s
+      if ("big".equals(element)) {
+        return Collections.singletonList(
+            new CustomWindow(c.timestamp(), 
c.timestamp().plus(Duration.standardSeconds(30)),
+                true));
+      } else {
+        return Collections.singletonList(
+            new CustomWindow(c.timestamp(), 
c.timestamp().plus(Duration.standardSeconds(5)),
+                false));
+      }
+    }
+
+    @Override
+    public void mergeWindows(MergeContext c) throws Exception {
+      List<CustomWindow> toBeMerged = new ArrayList<>();
+      CustomWindow bigWindow = null;
+      for (CustomWindow customWindow : c.windows()) {
+        if (customWindow.isBig) {
+          bigWindow = customWindow;
+          toBeMerged.add(customWindow);
+        } else if (bigWindow != null
+            && customWindow.start().isAfter(bigWindow.start())
+            && customWindow.end().isBefore(bigWindow.end())) {
+          toBeMerged.add(customWindow);
+        }
+      }
+      // in case bigWindow has not been seen yet
+      if (bigWindow != null) {
+        // merge small windows into big windows
+        c.merge(toBeMerged, bigWindow);
+      }
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof CustomWindowFn;
+    }
+
+    @Override
+    public Coder<CustomWindow> windowCoder() {
+      return CustomWindowCoder.of();
+    }
+
+    @Override
+    public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
+      throw new UnsupportedOperationException("side inputs not supported");
+    }
+
+
+  }
+
 }

Reply via email to