[FLINK-3192] [TableAPI] Add explain support to print the sql-execution plan.

This closes #1477


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbbab0a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbbab0a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbbab0a9

Branch: refs/heads/master
Commit: dbbab0a90b82d18ac14e4791d8f155c2e039b3ee
Parents: b474da6
Author: gallenvara <gallenv...@126.com>
Authored: Fri Dec 11 13:57:51 2015 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jan 12 22:27:07 2016 +0100

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |   6 +
 .../org/apache/flink/api/table/Table.scala      |  23 +++
 .../apache/flink/api/table/explain/Node.java    | 145 +++++++++++++
 .../flink/api/table/explain/PlanJsonParser.java | 144 +++++++++++++
 .../flink/api/table/plan/operations.scala       |   1 +
 .../api/java/table/test/SqlExplainITCase.java   | 206 +++++++++++++++++++
 .../api/scala/table/test/SqlExplainITCase.scala |  96 +++++++++
 .../src/test/scala/resources/testFilter0.out    |  28 +++
 .../src/test/scala/resources/testFilter1.out    |  96 +++++++++
 .../src/test/scala/resources/testJoin0.out      |  39 ++++
 .../src/test/scala/resources/testJoin1.out      | 141 +++++++++++++
 .../src/test/scala/resources/testUnion0.out     |  38 ++++
 .../src/test/scala/resources/testUnion1.out     | 140 +++++++++++++
 pom.xml                                         |   1 +
 14 files changed, 1104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml 
b/flink-staging/flink-table/pom.xml
index 45ea785..bdd1b58 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,6 +94,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+                       <version>${jackson.version}</version>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 6ece212..641f2fa 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -17,10 +17,14 @@
  */
 package org.apache.flink.api.table
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, 
PredicateAnalyzer, SelectionAnalyzer}
 import org.apache.flink.api.table.expressions.{Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
 import org.apache.flink.api.table.parser.ExpressionParser
 import org.apache.flink.api.table.plan._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
 
 /**
  * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
@@ -267,5 +271,24 @@ case class Table(private[flink] val operation: PlanNode) {
     this.copy(operation = UnionAll(operation, right.operation))
   }
 
+  /**
+   * Get the process of the sql parsing, print AST and physical execution 
plan.The AST
+   * show the structure of the supplied statement. The execution plan shows 
how the table 
+   * referenced by the statement will be scanned.
+   */
+  def explain(extended: Boolean): String = {
+    val ast = operation
+    val dataSet = this.toDataSet[Row]
+    val env = dataSet.getExecutionEnvironment
+    dataSet.output(new DiscardingOutputFormat[Row])
+    val jasonSqlPlan = env.getExecutionPlan()
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical 
Execution Plan ==" +
+      "\n" + sqlPlan
+    return result
+  }
+  
+  def explain(): String = explain(false)
+  
   override def toString: String = s"Expression($operation)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
new file mode 100644
index 0000000..9152260
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import java.util.List;
+
+public class Node {
+       private int id;
+       private String type;
+       private String pact;
+       private String contents;
+       private int parallelism;
+       private String driver_strategy;
+       private List<Predecessors> predecessors;
+       private List<Global_properties> global_properties;
+       private List<LocalProperty> local_properties;
+       private List<Estimates> estimates;
+       private List<Costs> costs;
+       private List<Compiler_hints> compiler_hints;
+
+       public int getId() {
+               return id;
+       }
+       public String getType() {
+               return type;
+       }
+       public String getPact() {
+               return pact;
+       }
+       public String getContents() {
+               return contents;
+       }
+       public int getParallelism() {
+               return parallelism;
+       }
+       public String getDriver_strategy() {
+               return driver_strategy;
+       }
+       public List<Predecessors> getPredecessors() {
+               return predecessors;
+       }
+       public List<Global_properties> getGlobal_properties() {
+               return global_properties;
+       }
+       public List<LocalProperty> getLocal_properties() {
+               return local_properties;
+       }
+       public List<Estimates> getEstimates() {
+               return estimates;
+       }
+       public List<Costs> getCosts() {
+               return costs;
+       }
+       public List<Compiler_hints> getCompiler_hints() {
+               return compiler_hints;
+       }
+}
+
+class Predecessors {
+       private String ship_strategy;
+       private String exchange_mode;
+
+       public String getShip_strategy() {
+               return ship_strategy;
+       }
+       public String getExchange_mode() {
+               return exchange_mode;
+       }
+}
+
+class Global_properties {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class LocalProperty {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Estimates {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Costs {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Compiler_hints {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
new file mode 100644
index 0000000..31a7cd68
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+       public static String getSqlExecutionPlan(String t, Boolean extended) 
throws Exception {
+               ObjectMapper objectMapper = new ObjectMapper();
+
+               //not every node is same, ignore the unknown field
+               
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+               PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+               LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               int tabCount = 0;
+
+               for (int index = 0; index < tree.getNodes().size(); index++) {
+                       Node tempNode = tree.getNodes().get(index);
+
+                       //input with operation such as join or union is 
coordinate, keep the same indent 
+                       if ((tempNode.getPact().equals("Data Source")) && 
(map.containsKey(tempNode.getPact()))) {
+                               tabCount = map.get(tempNode.getPact());
+                       }
+                       else {
+                               map.put(tempNode.getPact(), tabCount);
+                       }
+
+                       printTab(tabCount, pw);
+                       pw.print("Stage " + tempNode.getId() + " : " + 
tempNode.getPact() + "\n");
+
+                       printTab(tabCount + 1, pw);
+                       String content = tempNode.getContents();
+
+                       //drop the hashcode of object instance
+                       int dele = tempNode.getContents().indexOf("@");
+                       if (dele > -1) content = 
tempNode.getContents().substring(0, dele);
+                       
+                       //replace with certain content if node is dataSource to 
pass
+                       //unit tests, because java and scala use different api 
to
+                       //get input element
+                       if (tempNode.getPact().equals("Data Source"))
+                               content = "collect elements with 
CollectionInputFormat";
+                       pw.print("content : " + content + "\n");
+
+                       List<Predecessors> predecessors = 
tempNode.getPredecessors();
+                       if (predecessors != null) {
+                               printTab(tabCount + 1, pw);
+                               pw.print("ship_strategy : " + 
predecessors.get(0).getShip_strategy() + "\n");
+
+                               printTab(tabCount + 1, pw);
+                               pw.print("exchange_mode : " + 
predecessors.get(0).getExchange_mode() + "\n");
+                       }
+
+                       if (tempNode.getDriver_strategy() != null) {
+                               printTab(tabCount + 1, pw);
+                               pw.print("driver_strategy : " + 
tempNode.getDriver_strategy() + "\n");
+                       }
+
+                       printTab(tabCount + 1, pw);
+                       
pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+                                       + 
tempNode.getGlobal_properties().get(0).getValue() + "\n");
+
+                       if (extended) {
+                               List<Global_properties> globalProperties = 
tempNode.getGlobal_properties();
+                               for (int i = 1; i < globalProperties.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(globalProperties.get(i).getName() + " : "
+                                       + globalProperties.get(i).getValue() + 
"\n");
+                               }
+
+                               List<LocalProperty> localProperties = 
tempNode.getLocal_properties();
+                               for (int i = 0; i < localProperties.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(localProperties.get(i).getName() + " : "
+                                       + localProperties.get(i).getValue() + 
"\n");
+                               }
+
+                               List<Estimates> estimates = 
tempNode.getEstimates();
+                               for (int i = 0; i < estimates.size(); i++) {
+                                       printTab(tabCount + 1, pw);
+                                       pw.print(estimates.get(i).getName() + " 
: "
+                                       + estimates.get(i).getValue() + "\n");
+                               }
+
+                               List<Costs> costs = tempNode.getCosts();
+                               for (int i = 0; i < costs.size(); i++) {
+                                       printTab(tabCount + 1, pw);
+                                       pw.print(costs.get(i).getName() + " : "
+                                       + costs.get(i).getValue() + "\n");
+                               }
+
+                               List<Compiler_hints> compilerHintses = 
tempNode.getCompiler_hints();
+                               for (int i = 0; i < compilerHintses.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(compilerHintses.get(i).getName() + " : "
+                                       + compilerHintses.get(i).getValue() + 
"\n");
+                               }
+                       }
+                       tabCount++;
+                       pw.print("\n");
+               }
+               pw.close();
+               return sw.toString();
+       }
+
+       private static void printTab(int tabCount, PrintWriter pw) {
+               for (int i = 0; i < tabCount; i++)
+                       pw.print("\t");
+       }
+}
+
+class PlanTree {
+       private List<Node> nodes;
+
+       public List<Node> getNodes() {
+               return nodes;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
index ca7874b..7ec34d7 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
@@ -35,6 +35,7 @@ sealed abstract class PlanNode extends TreeNode[PlanNode] { 
self: Product =>
  */
 case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) 
extends PlanNode {
   val children = Nil
+  override def toString = s"Root($outputFields)"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
new file mode 100644
index 0000000..e73b5a2
--- /dev/null
+++ 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.table.Table;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+
+public class SqlExplainITCase {
+
+       private static String testFilePath = 
SqlExplainITCase.class.getResource("/").getFile();
+
+       public static class WC {
+               public String word;
+               public int count;
+
+               // Public constructor to make it a Flink POJO
+               public WC() {}
+
+               public WC(int count, String word) {
+                       this.word = word;
+                       this.count = count;
+               }
+       }
+
+       @Test
+       public void testGroupByWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC(1,"d"),
+                               new WC(2,"d"),
+                               new WC(3,"d"));
+
+               Table table = tableEnv.fromDataSet(input).as("a, b");
+
+               String result = table
+                               .filter("a % 2 = 0")
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testFilter0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testGroupByWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(2, "d"),
+                               new WC(3, "d"));
+
+               Table table = tableEnv.fromDataSet(input).as("a, b");
+
+               String result = table
+                               .filter("a % 2 = 0")
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testFilter1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testJoinWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1,"d"),
+                               new WC(1,"d"),
+                               new WC(1,"d"));
+
+               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+               String result = table1
+                               .join(table2)
+                               .where("b = d")
+                               .select("a, c")
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               "../../src/test/scala/resources/testJoin0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testJoinWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+               String result = table1
+                               .join(table2)
+                               .where("b = d")
+                               .select("a, c")
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               "../../src/test/scala/resources/testJoin1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testUnionWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1);
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2);
+
+               String result = table1
+                               .unionAll(table2)
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testUnion0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testUnionWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1);
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2);
+
+               String result = table1
+                               .unionAll(table2)
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testUnion1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
new file mode 100644
index 0000000..bead02f
--- /dev/null
+++ 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+import org.junit._
+import org.junit.Assert.assertEquals
+
+case class WC(count: Int, word: String)
+
+class SqlExplainITCase {
+
+  val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile
+
+  @Test
+  def testGroupByWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
+    val result = expr.filter("a % 2 = 0").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testGroupByWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
+    val result = expr.filter("a % 2 = 0").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString
+    assertEquals(result, source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out 
b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
new file mode 100644
index 0000000..062fc90
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
@@ -0,0 +1,28 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Filter
+                       content : ('a * 2) === 0
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       driver_strategy : FlatMap
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out 
b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
new file mode 100644
index 0000000..83378e6
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
@@ -0,0 +1,96 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Filter
+                       content : ('a * 2) === 0
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       driver_strategy : FlatMap
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : 0.0
+                       Est. Cardinality : 0.0
+                       Network : 0.0
+                       Disk I/O : 0.0
+                       CPU : 0.0
+                       Cumulative Network : 0.0
+                       Cumulative Disk I/O : 0.0
+                       Cumulative CPU : 0.0
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : 0.0
+                               Est. Cardinality : 0.0
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : 0.0
+                               Cumulative Disk I/O : 0.0
+                               Cumulative CPU : 0.0
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out 
b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
new file mode 100644
index 0000000..e6e30be
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
@@ -0,0 +1,39 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Join
+                       content : Join at 'b === 'd
+                       ship_strategy : Hash Partition on [1]
+                       exchange_mode : PIPELINED
+                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out 
b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
new file mode 100644
index 0000000..a8f05dd
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
@@ -0,0 +1,141 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Join
+                       content : Join at 'b === 'd
+                       ship_strategy : Hash Partition on [1]
+                       exchange_mode : PIPELINED
+                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : (unknown)
+                       Est. Cardinality : (unknown)
+                       Network : (unknown)
+                       Disk I/O : (unknown)
+                       CPU : (unknown)
+                       Cumulative Network : (unknown)
+                       Cumulative Disk I/O : (unknown)
+                       Cumulative CPU : (unknown)
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : (unknown)
+                               Est. Cardinality : (unknown)
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : (unknown)
+                               Cumulative Disk I/O : (unknown)
+                               Cumulative CPU : (unknown)
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out 
b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
new file mode 100644
index 0000000..db9d2f9
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
@@ -0,0 +1,38 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Union
+                       content : 
+                       ship_strategy : Redistribute
+                       exchange_mode : PIPELINED
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out 
b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
new file mode 100644
index 0000000..8dc1e53
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
@@ -0,0 +1,140 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Union
+                       content : 
+                       ship_strategy : Redistribute
+                       exchange_mode : PIPELINED
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : (unknown)
+                       Est. Cardinality : (unknown)
+                       Network : 0.0
+                       Disk I/O : 0.0
+                       CPU : 0.0
+                       Cumulative Network : (unknown)
+                       Cumulative Disk I/O : 0.0
+                       Cumulative CPU : 0.0
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : (unknown)
+                               Est. Cardinality : (unknown)
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : (unknown)
+                               Cumulative Disk I/O : 0.0
+                               Cumulative CPU : 0.0
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 91e4ca1..091552d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -797,6 +797,7 @@ under the License.
                                                
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
                                                
<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
                                                
<exclude>out/test/flink-avro/avro/user.avsc</exclude>
+                                               
<exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude>
                                                <!-- TweetInputFormat Test 
Data-->
                                                
<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
                                                
<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>

Reply via email to