flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule
Repository: flink Updated Branches: refs/heads/release-1.4 e7f7d0c93 -> 42e24413b [FLINK-7986] [table] Introduce FilterSetOpTransposeRule This closes #4956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e24413 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e24413 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e24413 Branch: refs/heads/release-1.4 Commit: 42e24413b5a47928e06f2a61086f7559370c65d8 Parents: e7f7d0c Author: XprayAuthored: Mon Nov 6 23:47:33 2017 +0800 Committer: twalthr Committed: Thu Nov 16 16:16:22 2017 +0100 -- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 80 .../api/stream/table/SetOperatorsTest.scala | 68 + 3 files changed, 150 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735d..a20d14f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, +// push filter through set operation +FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 2d4e205..35f4429 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase { util.verifyJavaTable(in, expected) } + + @Test + def testFilterUnionTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetUnion", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") +) + +util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetMinus", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") +), +term("groupBy", "b"), +
flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule
Repository: flink Updated Branches: refs/heads/master cd1fbc078 -> 81dc260dc [FLINK-7986] [table] Introduce FilterSetOpTransposeRule This closes #4956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d Branch: refs/heads/master Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e Parents: cd1fbc0 Author: XprayAuthored: Mon Nov 6 23:47:33 2017 +0800 Committer: twalthr Committed: Thu Nov 16 14:43:50 2017 +0100 -- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 80 .../api/stream/table/SetOperatorsTest.scala | 68 + 3 files changed, 150 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735d..a20d14f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, +// push filter through set operation +FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 2d4e205..35f4429 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase { util.verifyJavaTable(in, expected) } + + @Test + def testFilterUnionTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetUnion", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") +) + +util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetMinus", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") +), +term("groupBy", "b"), +