[FLINK-3192] [TableAPI] Add explain support to print the sql-execution plan.
This closes #1477 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbbab0a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbbab0a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbbab0a9 Branch: refs/heads/master Commit: dbbab0a90b82d18ac14e4791d8f155c2e039b3ee Parents: b474da6 Author: gallenvara <gallenv...@126.com> Authored: Fri Dec 11 13:57:51 2015 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jan 12 22:27:07 2016 +0100 ---------------------------------------------------------------------- flink-staging/flink-table/pom.xml | 6 + .../org/apache/flink/api/table/Table.scala | 23 +++ .../apache/flink/api/table/explain/Node.java | 145 +++++++++++++ .../flink/api/table/explain/PlanJsonParser.java | 144 +++++++++++++ .../flink/api/table/plan/operations.scala | 1 + .../api/java/table/test/SqlExplainITCase.java | 206 +++++++++++++++++++ .../api/scala/table/test/SqlExplainITCase.scala | 96 +++++++++ .../src/test/scala/resources/testFilter0.out | 28 +++ .../src/test/scala/resources/testFilter1.out | 96 +++++++++ .../src/test/scala/resources/testJoin0.out | 39 ++++ .../src/test/scala/resources/testJoin1.out | 141 +++++++++++++ .../src/test/scala/resources/testUnion0.out | 38 ++++ .../src/test/scala/resources/testUnion1.out | 140 +++++++++++++ pom.xml | 1 + 14 files changed, 1104 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml index 45ea785..bdd1b58 100644 --- a/flink-staging/flink-table/pom.xml +++ b/flink-staging/flink-table/pom.xml @@ -94,6 +94,12 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala index 6ece212..641f2fa 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala @@ -17,10 +17,14 @@ */ package org.apache.flink.api.table +import org.apache.flink.api.java.io.DiscardingOutputFormat +import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer} import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} import org.apache.flink.api.table.parser.ExpressionParser import org.apache.flink.api.table.plan._ +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ /** * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs @@ -267,5 +271,24 @@ case class Table(private[flink] val operation: PlanNode) { this.copy(operation = UnionAll(operation, right.operation)) } + /** + * Get the process of the sql parsing, print AST and physical execution plan.The AST + * show the structure of the supplied statement. The execution plan shows how the table + * referenced by the statement will be scanned. + */ + def explain(extended: Boolean): String = { + val ast = operation + val dataSet = this.toDataSet[Row] + val env = dataSet.getExecutionEnvironment + dataSet.output(new DiscardingOutputFormat[Row]) + val jasonSqlPlan = env.getExecutionPlan() + val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) + val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" + + "\n" + sqlPlan + return result + } + + def explain(): String = explain(false) + override def toString: String = s"Expression($operation)" } http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java new file mode 100644 index 0000000..9152260 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.explain; + +import java.util.List; + +public class Node { + private int id; + private String type; + private String pact; + private String contents; + private int parallelism; + private String driver_strategy; + private List<Predecessors> predecessors; + private List<Global_properties> global_properties; + private List<LocalProperty> local_properties; + private List<Estimates> estimates; + private List<Costs> costs; + private List<Compiler_hints> compiler_hints; + + public int getId() { + return id; + } + public String getType() { + return type; + } + public String getPact() { + return pact; + } + public String getContents() { + return contents; + } + public int getParallelism() { + return parallelism; + } + public String getDriver_strategy() { + return driver_strategy; + } + public List<Predecessors> getPredecessors() { + return predecessors; + } + public List<Global_properties> getGlobal_properties() { + return global_properties; + } + public List<LocalProperty> getLocal_properties() { + return local_properties; + } + public List<Estimates> getEstimates() { + return estimates; + } + public List<Costs> getCosts() { + return costs; + } + public List<Compiler_hints> getCompiler_hints() { + return compiler_hints; + } +} + +class Predecessors { + private String ship_strategy; + private String exchange_mode; + + public String getShip_strategy() { + return ship_strategy; + } + public String getExchange_mode() { + return exchange_mode; + } +} + +class Global_properties { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class LocalProperty { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Estimates { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Costs { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Compiler_hints { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java new file mode 100644 index 0000000..31a7cd68 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.explain; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.DeserializationFeature; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.LinkedHashMap; +import java.util.List; + +public class PlanJsonParser { + + public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + //not every node is same, ignore the unknown field + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + PlanTree tree = objectMapper.readValue(t, PlanTree.class); + LinkedHashMap<String, Integer> map = new LinkedHashMap<>(); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + int tabCount = 0; + + for (int index = 0; index < tree.getNodes().size(); index++) { + Node tempNode = tree.getNodes().get(index); + + //input with operation such as join or union is coordinate, keep the same indent + if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) { + tabCount = map.get(tempNode.getPact()); + } + else { + map.put(tempNode.getPact(), tabCount); + } + + printTab(tabCount, pw); + pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n"); + + printTab(tabCount + 1, pw); + String content = tempNode.getContents(); + + //drop the hashcode of object instance + int dele = tempNode.getContents().indexOf("@"); + if (dele > -1) content = tempNode.getContents().substring(0, dele); + + //replace with certain content if node is dataSource to pass + //unit tests, because java and scala use different api to + //get input element + if (tempNode.getPact().equals("Data Source")) + content = "collect elements with CollectionInputFormat"; + pw.print("content : " + content + "\n"); + + List<Predecessors> predecessors = tempNode.getPredecessors(); + if (predecessors != null) { + printTab(tabCount + 1, pw); + pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); + + printTab(tabCount + 1, pw); + pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n"); + } + + if (tempNode.getDriver_strategy() != null) { + printTab(tabCount + 1, pw); + pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); + } + + printTab(tabCount + 1, pw); + pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + + tempNode.getGlobal_properties().get(0).getValue() + "\n"); + + if (extended) { + List<Global_properties> globalProperties = tempNode.getGlobal_properties(); + for (int i = 1; i < globalProperties.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(globalProperties.get(i).getName() + " : " + + globalProperties.get(i).getValue() + "\n"); + } + + List<LocalProperty> localProperties = tempNode.getLocal_properties(); + for (int i = 0; i < localProperties.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(localProperties.get(i).getName() + " : " + + localProperties.get(i).getValue() + "\n"); + } + + List<Estimates> estimates = tempNode.getEstimates(); + for (int i = 0; i < estimates.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(estimates.get(i).getName() + " : " + + estimates.get(i).getValue() + "\n"); + } + + List<Costs> costs = tempNode.getCosts(); + for (int i = 0; i < costs.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(costs.get(i).getName() + " : " + + costs.get(i).getValue() + "\n"); + } + + List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints(); + for (int i = 0; i < compilerHintses.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(compilerHintses.get(i).getName() + " : " + + compilerHintses.get(i).getValue() + "\n"); + } + } + tabCount++; + pw.print("\n"); + } + pw.close(); + return sw.toString(); + } + + private static void printTab(int tabCount, PrintWriter pw) { + for (int i = 0; i < tabCount; i++) + pw.print("\t"); + } +} + +class PlanTree { + private List<Node> nodes; + + public List<Node> getNodes() { + return nodes; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala index ca7874b..7ec34d7 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala @@ -35,6 +35,7 @@ sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product => */ case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode { val children = Nil + override def toString = s"Root($outputFields)" } /** http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java new file mode 100644 index 0000000..e73b5a2 --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.table.Table; +import org.junit.Test; + +import java.io.File; +import java.util.Scanner; + +import static org.junit.Assert.assertEquals; + +public class SqlExplainITCase { + + private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile(); + + public static class WC { + public String word; + public int count; + + // Public constructor to make it a Flink POJO + public WC() {} + + public WC(int count, String word) { + this.word = word; + this.count = count; + } + } + + @Test + public void testGroupByWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC(1,"d"), + new WC(2,"d"), + new WC(3,"d")); + + Table table = tableEnv.fromDataSet(input).as("a, b"); + + String result = table + .filter("a % 2 = 0") + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testFilter0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testGroupByWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC(1, "d"), + new WC(2, "d"), + new WC(3, "d")); + + Table table = tableEnv.fromDataSet(input).as("a, b"); + + String result = table + .filter("a % 2 = 0") + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testFilter1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testJoinWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + + DataSet<WC> input2 = env.fromElements( + new WC(1,"d"), + new WC(1,"d"), + new WC(1,"d")); + + Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + + String result = table1 + .join(table2) + .where("b = d") + .select("a, c") + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testJoin0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testJoinWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + + String result = table1 + .join(table2) + .where("b = d") + .select("a, c") + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testJoin1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testUnionWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2); + + String result = table1 + .unionAll(table2) + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testUnion0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testUnionWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2); + + String result = table1 + .unionAll(table2) + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testUnion1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala new file mode 100644 index 0000000..bead02f --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +import org.junit._ +import org.junit.Assert.assertEquals + +case class WC(count: Int, word: String) + +class SqlExplainITCase { + + val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile + + @Test + def testGroupByWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) + val result = expr.filter("a % 2 = 0").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter0.out").mkString + assertEquals(result, source) + } + + @Test + def testGroupByWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) + val result = expr.filter("a % 2 = 0").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter1.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin0.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin1.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion0.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion1.out").mkString + assertEquals(result, source) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out new file mode 100644 index 0000000..062fc90 --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out @@ -0,0 +1,28 @@ +== Abstract Syntax Tree == +Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Filter + content : ('a * 2) === 0 + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out new file mode 100644 index 0000000..83378e6 --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out @@ -0,0 +1,96 @@ +== Abstract Syntax Tree == +Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Filter + content : ('a * 2) === 0 + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : 0.0 + Est. Cardinality : 0.0 + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : 0.0 + Est. Cardinality : 0.0 + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out new file mode 100644 index 0000000..e6e30be --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out @@ -0,0 +1,39 @@ +== Abstract Syntax Tree == +Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Join + content : Join at 'b === 'd + ship_strategy : Hash Partition on [1] + exchange_mode : PIPELINED + driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out new file mode 100644 index 0000000..a8f05dd --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out @@ -0,0 +1,141 @@ +== Abstract Syntax Tree == +Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Join + content : Join at 'b === 'd + ship_strategy : Hash Partition on [1] + exchange_mode : PIPELINED + driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : (unknown) + Disk I/O : (unknown) + CPU : (unknown) + Cumulative Network : (unknown) + Cumulative Disk I/O : (unknown) + Cumulative CPU : (unknown) + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : (unknown) + Cumulative CPU : (unknown) + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out new file mode 100644 index 0000000..db9d2f9 --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out @@ -0,0 +1,38 @@ +== Abstract Syntax Tree == +Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Union + content : + ship_strategy : Redistribute + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out new file mode 100644 index 0000000..8dc1e53 --- /dev/null +++ b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out @@ -0,0 +1,140 @@ +== Abstract Syntax Tree == +Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Union + content : + ship_strategy : Redistribute + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 91e4ca1..091552d 100644 --- a/pom.xml +++ b/pom.xml @@ -797,6 +797,7 @@ under the License. <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> <exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> + <exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude> <!-- TweetInputFormat Test Data--> <exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude> <exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>