This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a0515de [BEAM-7166] Add more checks on join condition. new 12b0493 Merge pull request #8421 from amaliujia/rw-more_checks_on_join_condition a0515de is described below commit a0515de0f593af234767f83f161699456b682cf6 Author: amaliujia <amaliu...@163.com> AuthorDate: Fri Apr 26 21:41:58 2019 -0700 [BEAM-7166] Add more checks on join condition. --- .../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 15 ++++- .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java | 65 ++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 1696329..fe066e2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -566,7 +566,12 @@ public class BeamJoinRel extends Join implements BeamRelNode { private Pair<RexNode, RexNode> extractJoinPairOfRexNodes(RexCall rexCall) { if (!rexCall.getOperator().getName().equals("=")) { - throw new UnsupportedOperationException("Non equi-join is not supported!"); + throw new UnsupportedOperationException("Non equi-join is not supported"); + } + + if (isIllegalJoinConjunctionClause(rexCall)) { + throw new UnsupportedOperationException( + "Only support column reference or struct field access in conjunction clause"); } int leftIndex = getColumnIndex(rexCall.getOperands().get(0)); @@ -578,6 +583,14 @@ public class BeamJoinRel extends Join implements BeamRelNode { } } + // Only support {RexInputRef | RexFieldAccess} = {RexInputRef | RexFieldAccess} + private boolean isIllegalJoinConjunctionClause(RexCall rexCall) { + return (!(rexCall.getOperands().get(0) instanceof RexInputRef) + && !(rexCall.getOperands().get(0) instanceof RexFieldAccess)) + || (!(rexCall.getOperands().get(1) instanceof RexInputRef) + && !(rexCall.getOperands().get(1) instanceof RexFieldAccess)); + } + private int getColumnIndex(RexNode rexNode) { if (rexNode instanceof RexInputRef) { return ((RexInputRef) rexNode).getIndex(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java index 162b0ef..d8e8a61 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -24,13 +24,16 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.hamcrest.core.StringContains; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** Bounded + Bounded Test for {@code BeamJoinRel}. */ public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); public static final TestBoundedTable ORDER_DETAILS1 = TestBoundedTable.of( @@ -227,6 +230,68 @@ public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest { pipeline.run(); } + @Test + public void testException_join_condition1() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id = o2.site_id OR o1.price = o2.site_id"; + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("Operator OR")); + compilePipeline(sql, pipeline); + pipeline.run(); + } + + @Test + public void testException_join_condition2() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id = o2.site_id AND o1.price > o2.site_id"; + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("Non equi-join")); + compilePipeline(sql, pipeline); + pipeline.run(); + } + + @Test + public void testException_join_condition3() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id + o2.site_id = 2"; + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("column reference")); + thrown.expectMessage(StringContains.containsString("struct field access")); + compilePipeline(sql, pipeline); + pipeline.run(); + } + + @Test + public void testException_join_condition4() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id + o2.site_id = 2 AND o1.price > o2.site_id"; + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("column reference")); + thrown.expectMessage(StringContains.containsString("struct field access")); + compilePipeline(sql, pipeline); + pipeline.run(); + } + @Test(expected = UnsupportedOperationException.class) public void testException_crossJoin() throws Exception { String sql = "SELECT * " + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";