This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a0515de  [BEAM-7166] Add more checks on join condition.
     new 12b0493  Merge pull request #8421 from 
amaliujia/rw-more_checks_on_join_condition
a0515de is described below

commit a0515de0f593af234767f83f161699456b682cf6
Author: amaliujia <amaliu...@163.com>
AuthorDate: Fri Apr 26 21:41:58 2019 -0700

    [BEAM-7166] Add more checks on join condition.
---
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   | 15 ++++-
 .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java  | 65 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 1696329..fe066e2 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -566,7 +566,12 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
 
   private Pair<RexNode, RexNode> extractJoinPairOfRexNodes(RexCall rexCall) {
     if (!rexCall.getOperator().getName().equals("=")) {
-      throw new UnsupportedOperationException("Non equi-join is not 
supported!");
+      throw new UnsupportedOperationException("Non equi-join is not 
supported");
+    }
+
+    if (isIllegalJoinConjunctionClause(rexCall)) {
+      throw new UnsupportedOperationException(
+          "Only support column reference or struct field access in conjunction 
clause");
     }
 
     int leftIndex = getColumnIndex(rexCall.getOperands().get(0));
@@ -578,6 +583,14 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     }
   }
 
+  // Only support {RexInputRef | RexFieldAccess} = {RexInputRef | 
RexFieldAccess}
+  private boolean isIllegalJoinConjunctionClause(RexCall rexCall) {
+    return (!(rexCall.getOperands().get(0) instanceof RexInputRef)
+            && !(rexCall.getOperands().get(0) instanceof RexFieldAccess))
+        || (!(rexCall.getOperands().get(1) instanceof RexInputRef)
+            && !(rexCall.getOperands().get(1) instanceof RexFieldAccess));
+  }
+
   private int getColumnIndex(RexNode rexNode) {
     if (rexNode instanceof RexInputRef) {
       return ((RexInputRef) rexNode).getIndex();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index 162b0ef..d8e8a61 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -24,13 +24,16 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.hamcrest.core.StringContains;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Bounded + Bounded Test for {@code BeamJoinRel}. */
 public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   public static final TestBoundedTable ORDER_DETAILS1 =
       TestBoundedTable.of(
@@ -227,6 +230,68 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
     pipeline.run();
   }
 
+  @Test
+  public void testException_join_condition1() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id = o2.site_id OR o1.price = o2.site_id";
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("Operator OR"));
+    compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
+  @Test
+  public void testException_join_condition2() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id = o2.site_id AND o1.price > o2.site_id";
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("Non equi-join"));
+    compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
+  @Test
+  public void testException_join_condition3() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id + o2.site_id = 2";
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("column reference"));
+    thrown.expectMessage(StringContains.containsString("struct field access"));
+    compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
+  @Test
+  public void testException_join_condition4() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id + o2.site_id = 2 AND o1.price > o2.site_id";
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("column reference"));
+    thrown.expectMessage(StringContains.containsString("struct field access"));
+    compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
   @Test(expected = UnsupportedOperationException.class)
   public void testException_crossJoin() throws Exception {
     String sql = "SELECT *  " + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";

Reply via email to