[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2018-09-20 Thread Mayer Crystal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622869#comment-16622869
 ] 

Mayer Crystal commented on FLINK-7942:
--

I know that this is marked as resolved, but I'm wondering if there was a 
regression or if the are other codepaths that may still exhibit this issue.  I 
am running flink 1.6.1 and am seeing the same stack trace as above (included 
below).  I don't have a standalone test yet (the code is a bit involved in that 
the actual joins and projections are controlled by external configurations in 
this case), but the sequence of events is basically as follows:

1. A table, T1, is registered with the table env using a CsvTableSource
2. A second table, T2, is registered with the env using a CsvTableSource.  
3. The columns in T2 are aliased (using the .as(String)) method to ensure that 
there are no name clashes during the join.
4. T2 is joined to T1 using a single equality join condition and then the 
desired columns are selected.
5. The columns of the result of step 4, J1, are renamed back to the 'expected' 
names using the .as(String) method
6. The data is J1 is filtered
7. The filtered table is then joined to another table, T3, using the same 
process as in steps 3 - 5
8. Another table, T4, is also joined using the process as in steps 3 - 5
8. The resultant table is then grouped/aggregated
9. The resultant table is then filtered 
10. The final table is sent to a sink for writing to disk

Also, as a sanity check, I performed the hack mentioned above (modifying the 
calcite class to have map.put(SqlKind.AS, Policy.AS_IS);) and the code then 
worked as expected.


java.lang.RuntimeException: Error while applying rule 
FilterJoinRule:FilterJoinRule:filter, args 
[rel#771:LogicalFilter.NONE(input=rel#468:Subset#41.NONE,condition==(AS($5, 
_UTF-16LE'level'), _UTF-16LE'Level 1')), 
rel#467:LogicalJoin.NONE(left=rel#465:Subset#39.NONE,right=rel#466:Subset#40.NONE,condition==($1,
 $3),joinType=left)]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:271)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:478)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:511)
at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311)
at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
...
Caused by: java.lang.NullPointerException: null
at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
at 
org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2353)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
... 21 common frames omitted

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.4.0, 1.5.0
>
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> 

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253840#comment-16253840
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5019


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   "DataSetScan(table=[[_DataSetTable_1]])"
> 

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253814#comment-16253814
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5019
  
Looks good. +1 to merge


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253654#comment-16253654
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5019
  
Thanks @fhueske. I simplified the changes a bit more. Now we use explicit 
aliases only for windows and the alias operator. 


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253379#comment-16253379
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5019#discussion_r151110174
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
 ---
@@ -203,19 +203,17 @@ class OverWindowTest extends TableTestBase {
 "DataStreamCalc",
 unaryNode(
   "DataStreamOverAggregate",
-  unaryNode(
-"DataStreamCalc",
-streamTableNode(0),
-term("select", "a", "c", "proctime")
-  ),
+  streamTableNode(0),
--- End diff --

The new plan is less efficient, because a projection isn't pushed down 
anymore and a wider row needs to be materialized in the over window.


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253293#comment-16253293
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5019

[FLINK-7942] [table] Reduce aliasing in RexNodes

## What is the purpose of the change

This PR reduces the number of `AS` expressions in projects. This fixes the 
bug mentioned in FLINK-7942 with the `FilterJoinRule` and improves the plans. 
Many calc operations are not needed anymore. For multi-windows, the change 
caused some problems that are not trivial to fix, so they still use aliasing 
for resolving time attributes.

## Brief change log

- Remove explicit alias expression from RexNodes


## Verifying this change

- Added testFilterJoinRule in JoinTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-7942

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5019


commit 82356f22154c563fcd8bd018dd954fe274e7fe5f
Author: twalthr 
Date:   2017-11-15T11:07:16Z

[FLINK-7942] [table] Reduce aliasing in RexNodes




> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread lincoln.lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253111#comment-16253111
 ] 

lincoln.lee commented on FLINK-7942:


[~twalthr] It's great if you have time to fix this, I'm on vocation now.

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   "DataSetScan(table=[[_DataSetTable_1]])"
> util.verifyTable(results, expected)
>   }
> }
> 

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251353#comment-16251353
 ] 

Timo Walther commented on FLINK-7942:
-

[~lincoln.86xy] if it is ok, I would try to fix this issue? Maybe we can still 
get it into the next release.

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   "DataSetScan(table=[[_DataSetTable_1]])"
> 

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235700#comment-16235700
 ] 

Fabian Hueske commented on FLINK-7942:
--

Julian's response on CALCITE-2023 suggests to generate {{RexNode}} expressions 
without {{AS}} nodes. 
Maybe we can fix this in the Table API {{Expression}} -> {{RexNode}} 
translation.

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Major
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>