This is an automated email from the ASF dual-hosted git repository. mingmxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e9cd1f9 [BEAM-3345][SQL] Reject unsupported inputs into JOIN (#4642) e9cd1f9 is described below commit e9cd1f9c92636e968cb0b550f2ee3a6fe5d103a5 Author: Anton Kedin <33067037+ake...@users.noreply.github.com> AuthorDate: Wed Feb 14 11:16:19 2018 -0800 [BEAM-3345][SQL] Reject unsupported inputs into JOIN (#4642) * [SQL] Reject unsupported inputs into JOIN Allow only non-global windows with default trigger or end of window trigger. * fixup! [SQL] Reject unsupported inputs into JOIN --- sdks/java/extensions/sql/pom.xml | 5 + .../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 39 ++++- .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 195 +++++++++++++++++++-- .../apache/beam/sdk/extensions/sql/TestUtils.java | 19 ++ 4 files changed, 244 insertions(+), 14 deletions(-) diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml index 16cfe77..46de596 100644 --- a/sdks/java/extensions/sql/pom.xml +++ b/sdks/java/extensions/sql/pom.xml @@ -409,6 +409,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>${mockito.version}</version> diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index bebfca3..89196ef 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; +import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED; import static org.apache.beam.sdk.values.RowType.toRowType; +import static org.joda.time.Duration.ZERO; import com.google.common.base.Joiner; import java.util.ArrayList; @@ -35,7 +37,10 @@ import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -43,6 +48,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.RowType; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -112,6 +118,9 @@ public class BeamJoinRel extends Join implements BeamRelNode { PCollection<Row> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); PCollection<Row> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + verifySupportedTrigger(leftRows); + verifySupportedTrigger(rightRows); + String stageName = BeamSqlRelUtils.getStageName(this); WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); @@ -151,8 +160,8 @@ public class BeamJoinRel extends Join implements BeamRelNode { // a regular join if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { + || (leftRows.isBounded() == UNBOUNDED + && rightRows.isBounded() == UNBOUNDED)) { try { leftWinFn.verifyCompatibility(rightWinFn); } catch (IncompatibleWindowException e) { @@ -164,8 +173,8 @@ public class BeamJoinRel extends Join implements BeamRelNode { leftNullRow, rightNullRow, stageName); } else if ( (leftRows.isBounded() == PCollection.IsBounded.BOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == UNBOUNDED) + || (leftRows.isBounded() == UNBOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) ) { // if one of the sides is Bounded & the other is Unbounded @@ -194,6 +203,28 @@ public class BeamJoinRel extends Join implements BeamRelNode { } } + private void verifySupportedTrigger(PCollection<Row> pCollection) { + WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy(); + + if (UNBOUNDED.equals(pCollection.isBounded()) + && !triggersOncePerWindow(windowingStrategy)) { + throw new UnsupportedOperationException( + "Joining unbounded PCollections is currently only supported for " + + "non-global windows with triggers that are known to produce output once per window," + + "such as the default trigger with zero allowed lateness. " + + "In these cases Beam can guarantee it joins all input elements once per window. " + + windowingStrategy + " is not supported"); + } + } + + private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) { + Trigger trigger = windowingStrategy.getTrigger(); + + return !(windowingStrategy.getWindowFn() instanceof GlobalWindows) + && trigger instanceof DefaultTrigger + && ZERO.equals(windowingStrategy.getAllowedLateness()); + } + private PCollection<Row> standardJoin( PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index 979c669..aef8e55 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -18,28 +18,43 @@ package org.apache.beam.sdk.extensions.sql; +import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple; import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest .ORDER_DETAILS1; import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest .ORDER_DETAILS2; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.stringContainsInOrder; +import java.util.Arrays; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.RowType; -import org.apache.beam.sdk.values.TupleTag; +import org.hamcrest.Matcher; +import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * Tests for joins in queries. */ public class BeamSqlDslJoinTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); + + @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static final RowType SOURCE_ROW_TYPE = RowSqlType.builder() @@ -156,8 +171,10 @@ public class BeamSqlDslJoinTest { pipeline.run(); } - @Test(expected = IllegalStateException.class) + @Test public void testException_crossJoin() throws Exception { + thrown.expect(IllegalStateException.class); + String sql = "SELECT * " + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; @@ -167,14 +184,172 @@ public class BeamSqlDslJoinTest { pipeline.run(); } + @Test + public void testJoinsUnboundedWithinWindowsWithDefaultTrigger() throws Exception { + + String sql = + "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> orders = ordersUnbounded() + .apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(50)))); + PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders); + + PAssert + .that( + inputs.apply("sql", BeamSql.queryMulti(sql))) + .containsInAnyOrder( + TestUtils.RowsBuilder + .of( + RESULT_ROW_TYPE + ).addRows( + 1, 2, 2, 2, 2, 1, + 1, 4, 3, 3, 3, 1 + ).getRows()); + + pipeline.run(); + } + + @Test + public void testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws Exception { + + String sql = + "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> orders = ordersUnbounded() + .apply("window", + Window + .<Row>into(FixedWindows.of(Duration.standardSeconds(50))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders); + + thrown.expectCause(expectedSingleFireTrigger()); + + inputs.apply("sql", BeamSql.queryMulti(sql)); + + pipeline.run(); + } + + @Test + public void testRejectsGlobalWindowsWithDefaultTriggerInUnboundedInput() throws Exception { + + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> orders = ordersUnbounded(); + PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders); + + thrown.expectCause(expectedSingleFireTrigger()); + + inputs.apply("sql", BeamSql.queryMulti(sql)); + + pipeline.run(); + } + + @Test + public void testRejectsGlobalWindowsWithEndOfWindowTrigger() throws Exception { + + String sql = + "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> orders = ordersUnbounded() + .apply("window", + Window + .<Row>into(new GlobalWindows()) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders); + + thrown.expectCause(expectedSingleFireTrigger()); + + inputs.apply("sql", BeamSql.queryMulti(sql)); + + pipeline.run(); + } + + @Test + public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception { + + String sql = + "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> orders = ordersUnbounded() + .apply( + "window", + Window + .<Row>into(FixedWindows.of(Duration.standardSeconds(203))) + .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .withAllowedLateness(Duration.standardMinutes(2)) + .accumulatingFiredPanes()); + PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders); + + thrown.expectCause(expectedSingleFireTrigger()); + + inputs.apply("sql", BeamSql.queryMulti(sql)); + + pipeline.run(); + } + + private PCollection<Row> ordersUnbounded() { + DateTime ts = new DateTime(2017, 1, 1, 1, 0, 0); + + return + TestUtils + .rowsBuilderOf( + RowSqlType + .builder() + .withIntegerField("order_id") + .withIntegerField("price") + .withIntegerField("site_id") + .withTimestampField("timestamp") + .build()) + .addRows( + 1, 2, 2, ts.plusSeconds(0).toDate(), + 2, 2, 1, ts.plusSeconds(40).toDate(), + 1, 4, 3, ts.plusSeconds(60).toDate(), + 3, 2, 1, ts.plusSeconds(65).toDate(), + 3, 3, 1, ts.plusSeconds(70).toDate()) + .getPCollectionBuilder() + .withTimestampField("timestamp") + .inPipeline(pipeline) + .buildUnbounded(); + } + private PCollection<Row> queryFromOrderTables(String sql) { - return PCollectionTuple.of( - new TupleTag<>("ORDER_DETAILS1"), - ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)) - .and( - new TupleTag<>("ORDER_DETAILS2"), - ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)) + return tuple( + "ORDER_DETAILS1", ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER), + "ORDER_DETAILS2", ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)) .apply("join", BeamSql.queryMulti(sql)) .setCoder(RESULT_CODER); } + + private Matcher<UnsupportedOperationException> expectedSingleFireTrigger() { + return allOf( + isA(UnsupportedOperationException.class), + hasProperty("message", + stringContainsInOrder( + Arrays.asList("once per window", "default trigger")))); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 677a848..fff1f3e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -34,8 +34,10 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.RowType; +import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** @@ -277,4 +279,21 @@ public class TestUtils { .map(values -> values.stream().collect(toRow(type))) .collect(toList()); } + + public static PCollectionTuple tuple(String tag, PCollection<Row> pCollection) { + return PCollectionTuple.of(new TupleTag<>(tag), pCollection); + } + + public static PCollectionTuple tuple(String tag1, PCollection<Row> pCollection1, + String tag2, PCollection<Row> pCollection2) { + return tuple(tag1, pCollection1).and(new TupleTag<>(tag2), pCollection2); + } + + public static PCollectionTuple tuple(String tag1, PCollection<Row> pCollection1, + String tag2, PCollection<Row> pCollection2, + String tag3, PCollection<Row> pCollection3) { + return tuple( + tag1, pCollection1, + tag2, pCollection2).and(new TupleTag<>(tag3), pCollection3); + } } -- To stop receiving notification emails like this one, please contact ming...@apache.org.