This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9cd1f9  [BEAM-3345][SQL] Reject unsupported inputs into JOIN (#4642)
e9cd1f9 is described below

commit e9cd1f9c92636e968cb0b550f2ee3a6fe5d103a5
Author: Anton Kedin <33067037+ake...@users.noreply.github.com>
AuthorDate: Wed Feb 14 11:16:19 2018 -0800

    [BEAM-3345][SQL] Reject unsupported inputs into JOIN (#4642)
    
    * [SQL] Reject unsupported inputs into JOIN
    
    Allow only non-global windows with default trigger or end of window trigger.
    
    * fixup! [SQL] Reject unsupported inputs into JOIN
---
 sdks/java/extensions/sql/pom.xml                   |   5 +
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   |  39 ++++-
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java     | 195 +++++++++++++++++++--
 .../apache/beam/sdk/extensions/sql/TestUtils.java  |  19 ++
 4 files changed, 244 insertions(+), 14 deletions(-)

diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index 16cfe77..46de596 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -409,6 +409,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <version>${mockito.version}</version>
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index bebfca3..89196ef 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -18,7 +18,9 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
+import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
 import static org.apache.beam.sdk.values.RowType.toRowType;
+import static org.joda.time.Duration.ZERO;
 
 import com.google.common.base.Joiner;
 import java.util.ArrayList;
@@ -35,7 +37,10 @@ import 
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -43,6 +48,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.RowType;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -112,6 +118,9 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     PCollection<Row> leftRows = 
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
     PCollection<Row> rightRows = 
rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
+    verifySupportedTrigger(leftRows);
+    verifySupportedTrigger(rightRows);
+
     String stageName = BeamSqlRelUtils.getStageName(this);
     WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
     WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
@@ -151,8 +160,8 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     // a regular join
     if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
             && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-           || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
-                && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
+           || (leftRows.isBounded() == UNBOUNDED
+                && rightRows.isBounded() == UNBOUNDED)) {
       try {
         leftWinFn.verifyCompatibility(rightWinFn);
       } catch (IncompatibleWindowException e) {
@@ -164,8 +173,8 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
           leftNullRow, rightNullRow, stageName);
     } else if (
         (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-        && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
-        || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+        && rightRows.isBounded() == UNBOUNDED)
+        || (leftRows.isBounded() == UNBOUNDED
             && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
         ) {
       // if one of the sides is Bounded & the other is Unbounded
@@ -194,6 +203,28 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     }
   }
 
+  private void verifySupportedTrigger(PCollection<Row> pCollection) {
+    WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
+
+    if (UNBOUNDED.equals(pCollection.isBounded())
+        && !triggersOncePerWindow(windowingStrategy)) {
+      throw new UnsupportedOperationException(
+          "Joining unbounded PCollections is currently only supported for "
+              + "non-global windows with triggers that are known to produce 
output once per window,"
+              + "such as the default trigger with zero allowed lateness. "
+              + "In these cases Beam can guarantee it joins all input elements 
once per window. "
+              + windowingStrategy + " is not supported");
+    }
+  }
+
+  private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
+    Trigger trigger = windowingStrategy.getTrigger();
+
+    return !(windowingStrategy.getWindowFn() instanceof GlobalWindows)
+        && trigger instanceof DefaultTrigger
+        && ZERO.equals(windowingStrategy.getAllowedLateness());
+  }
+
   private PCollection<Row> standardJoin(
       PCollection<KV<Row, Row>> extractedLeftRows,
       PCollection<KV<Row, Row>> extractedRightRows,
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 979c669..aef8e55 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -18,28 +18,43 @@
 
 package org.apache.beam.sdk.extensions.sql;
 
+import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
 import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
     .ORDER_DETAILS1;
 import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
     .ORDER_DETAILS2;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.stringContainsInOrder;
 
+import java.util.Arrays;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.RowType;
-import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matcher;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * Tests for joins in queries.
  */
 public class BeamSqlDslJoinTest {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
+
+  @Rule public final ExpectedException thrown = ExpectedException.none();
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
 
   private static final RowType SOURCE_ROW_TYPE =
       RowSqlType.builder()
@@ -156,8 +171,10 @@ public class BeamSqlDslJoinTest {
     pipeline.run();
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void testException_crossJoin() throws Exception {
+    thrown.expect(IllegalStateException.class);
+
     String sql =
         "SELECT *  "
             + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
@@ -167,14 +184,172 @@ public class BeamSqlDslJoinTest {
     pipeline.run();
   }
 
+  @Test
+  public void testJoinsUnboundedWithinWindowsWithDefaultTrigger() throws 
Exception {
+
+    String sql =
+        "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, 
o2.site_id  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> orders = ordersUnbounded()
+        .apply("window", 
Window.into(FixedWindows.of(Duration.standardSeconds(50))));
+    PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
+
+    PAssert
+        .that(
+            inputs.apply("sql", BeamSql.queryMulti(sql)))
+        .containsInAnyOrder(
+        TestUtils.RowsBuilder
+            .of(
+            RESULT_ROW_TYPE
+        ).addRows(
+            1, 2, 2, 2, 2, 1,
+            1, 4, 3, 3, 3, 1
+        ).getRows());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws 
Exception {
+
+    String sql =
+        "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, 
o2.site_id  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> orders = ordersUnbounded()
+        .apply("window",
+               Window
+                   .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
+                   .triggering(AfterWatermark.pastEndOfWindow())
+                   .withAllowedLateness(Duration.ZERO)
+                   .accumulatingFiredPanes());
+    PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
+
+    thrown.expectCause(expectedSingleFireTrigger());
+
+    inputs.apply("sql", BeamSql.queryMulti(sql));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testRejectsGlobalWindowsWithDefaultTriggerInUnboundedInput() 
throws Exception {
+
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> orders = ordersUnbounded();
+    PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
+
+    thrown.expectCause(expectedSingleFireTrigger());
+
+    inputs.apply("sql", BeamSql.queryMulti(sql));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testRejectsGlobalWindowsWithEndOfWindowTrigger() throws 
Exception {
+
+    String sql =
+        "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, 
o2.site_id  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> orders = ordersUnbounded()
+        .apply("window",
+               Window
+                   .<Row>into(new GlobalWindows())
+                   .triggering(AfterWatermark.pastEndOfWindow())
+                   .withAllowedLateness(Duration.ZERO)
+                   .accumulatingFiredPanes());
+    PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
+
+    thrown.expectCause(expectedSingleFireTrigger());
+
+    inputs.apply("sql", BeamSql.queryMulti(sql));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws 
Exception {
+
+    String sql =
+        "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, 
o2.site_id  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> orders = ordersUnbounded()
+        .apply(
+            "window",
+            Window
+                .<Row>into(FixedWindows.of(Duration.standardSeconds(203)))
+                
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+                .withAllowedLateness(Duration.standardMinutes(2))
+                .accumulatingFiredPanes());
+    PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
+
+    thrown.expectCause(expectedSingleFireTrigger());
+
+    inputs.apply("sql", BeamSql.queryMulti(sql));
+
+    pipeline.run();
+  }
+
+  private PCollection<Row> ordersUnbounded() {
+    DateTime ts = new DateTime(2017, 1, 1, 1, 0, 0);
+
+    return
+        TestUtils
+            .rowsBuilderOf(
+                RowSqlType
+                    .builder()
+                    .withIntegerField("order_id")
+                    .withIntegerField("price")
+                    .withIntegerField("site_id")
+                    .withTimestampField("timestamp")
+                    .build())
+            .addRows(
+                1, 2, 2, ts.plusSeconds(0).toDate(),
+                2, 2, 1, ts.plusSeconds(40).toDate(),
+                1, 4, 3, ts.plusSeconds(60).toDate(),
+                3, 2, 1, ts.plusSeconds(65).toDate(),
+                3, 3, 1, ts.plusSeconds(70).toDate())
+            .getPCollectionBuilder()
+            .withTimestampField("timestamp")
+            .inPipeline(pipeline)
+            .buildUnbounded();
+  }
+
   private PCollection<Row> queryFromOrderTables(String sql) {
-    return PCollectionTuple.of(
-        new TupleTag<>("ORDER_DETAILS1"),
-        ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER))
-        .and(
-            new TupleTag<>("ORDER_DETAILS2"),
-            ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER))
+    return tuple(
+        "ORDER_DETAILS1", 
ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER),
+        "ORDER_DETAILS2", 
ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER))
         .apply("join", BeamSql.queryMulti(sql))
         .setCoder(RESULT_CODER);
   }
+
+  private Matcher<UnsupportedOperationException> expectedSingleFireTrigger() {
+    return allOf(
+        isA(UnsupportedOperationException.class),
+        hasProperty("message",
+                    stringContainsInOrder(
+                        Arrays.asList("once per window", "default trigger"))));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 677a848..fff1f3e 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -34,8 +34,10 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.RowType;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
@@ -277,4 +279,21 @@ public class TestUtils {
             .map(values -> values.stream().collect(toRow(type)))
             .collect(toList());
   }
+
+  public static PCollectionTuple tuple(String tag, PCollection<Row> 
pCollection) {
+    return PCollectionTuple.of(new TupleTag<>(tag), pCollection);
+  }
+
+  public static PCollectionTuple tuple(String tag1, PCollection<Row> 
pCollection1,
+                                       String tag2, PCollection<Row> 
pCollection2) {
+    return tuple(tag1, pCollection1).and(new TupleTag<>(tag2), pCollection2);
+  }
+
+  public static PCollectionTuple tuple(String tag1, PCollection<Row> 
pCollection1,
+                                       String tag2, PCollection<Row> 
pCollection2,
+                                       String tag3, PCollection<Row> 
pCollection3) {
+    return tuple(
+        tag1, pCollection1,
+        tag2, pCollection2).and(new TupleTag<>(tag3), pCollection3);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to