http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
new file mode 100644
index 0000000..428aec5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class JoinITCase extends MultipleProgramsTestBase {
+
+
+       public JoinITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testJoin() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1.join(in2).where("b === e").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testJoinWithFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1.join(in2).where("b === e && b < 
2").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testJoinWithMultipleKeys() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1.join(in2).where("a === d && b === 
h").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
+                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinNonExistingKey() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1.join(in2).where("foo === e").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinWithNonMatchingKeyTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1
+                               .join(in2).where("a === g").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinWithAmbiguousFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
+
+               Table result = in1
+                               .join(in2).where("a === d").select("c, g");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testJoinWithAggregation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table result = in1
+                               .join(in2).where("a === d").select("g.count");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "6";
+               compareResultAsText(results, expected);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
new file mode 100644
index 0000000..d61912b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+       public PojoGroupingITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testPojoGrouping() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+                       new Tuple3<>("A", 23.0, "Z"),
+                       new Tuple3<>("A", 24.0, "Y"),
+                       new Tuple3<>("B", 1.0, "Z"));
+
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table = tableEnv
+                       .fromDataSet(data, "groupMe, value, name")
+                       .select("groupMe, value, name")
+                       .where("groupMe != 'B'");
+
+               DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
+
+               DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+                       .sortGroup("value", Order.DESCENDING)
+                       .first(1);
+               List<MyPojo> resultList = result.collect();
+
+               compareResultAsText(resultList, "A,24.0,Y");
+       }
+
+       public static class MyPojo implements Serializable {
+               private static final long serialVersionUID = 
8741918940120107213L;
+
+               public String groupMe;
+               public double value;
+               public String name;
+
+               public MyPojo() {
+                       // for serialization
+               }
+
+               public MyPojo(String groupMe, double value, String name) {
+                       this.groupMe = groupMe;
+                       this.value = value;
+                       this.name = name;
+               }
+
+               @Override
+               public String toString() {
+                       return groupMe + "," + value + "," + name;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
new file mode 100644
index 0000000..9e42f53
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class SelectITCase extends MultipleProgramsTestBase {
+
+
+       public SelectITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testSimpleSelectAllWithAs() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+               Table result = in
+                               .select("a, b, c");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               compareResultAsText(results, expected);
+
+       }
+
+       @Test
+       public void testSimpleSelectWithNaming() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds);
+
+               Table result = in
+                               .select("f0 as a, f1 as b")
+                               .select("a, b");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
+                               "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+                               "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToFewFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = " sorry dude ";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToManyFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = " sorry dude ";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithAmbiguousFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = " today's not your day ";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testOnlyFieldRefInAs() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "sorry bro";
+               compareResultAsText(results, expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
new file mode 100644
index 0000000..e73b5a2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.table.Table;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+
+public class SqlExplainITCase {
+
+       private static String testFilePath = 
SqlExplainITCase.class.getResource("/").getFile();
+
+       public static class WC {
+               public String word;
+               public int count;
+
+               // Public constructor to make it a Flink POJO
+               public WC() {}
+
+               public WC(int count, String word) {
+                       this.word = word;
+                       this.count = count;
+               }
+       }
+
+       @Test
+       public void testGroupByWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC(1,"d"),
+                               new WC(2,"d"),
+                               new WC(3,"d"));
+
+               Table table = tableEnv.fromDataSet(input).as("a, b");
+
+               String result = table
+                               .filter("a % 2 = 0")
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testFilter0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testGroupByWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(2, "d"),
+                               new WC(3, "d"));
+
+               Table table = tableEnv.fromDataSet(input).as("a, b");
+
+               String result = table
+                               .filter("a % 2 = 0")
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testFilter1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testJoinWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1,"d"),
+                               new WC(1,"d"),
+                               new WC(1,"d"));
+
+               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+               String result = table1
+                               .join(table2)
+                               .where("b = d")
+                               .select("a, c")
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               "../../src/test/scala/resources/testJoin0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testJoinWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+               String result = table1
+                               .join(table2)
+                               .where("b = d")
+                               .select("a, c")
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               "../../src/test/scala/resources/testJoin1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testUnionWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1);
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2);
+
+               String result = table1
+                               .unionAll(table2)
+                               .explain();
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testUnion0.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+
+       @Test
+       public void testUnionWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input1 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table1 = tableEnv.fromDataSet(input1);
+
+               DataSet<WC> input2 = env.fromElements(
+                               new WC(1, "d"),
+                               new WC(1, "d"),
+                               new WC(1, "d"));
+
+               Table table2 = tableEnv.fromDataSet(input2);
+
+               String result = table1
+                               .unionAll(table2)
+                               .explain(true);
+               String source = new Scanner(new File(testFilePath +
+                               
"../../src/test/scala/resources/testUnion1.out"))
+                               .useDelimiter("\\A").next();
+               assertEquals(result, source);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
new file mode 100644
index 0000000..7936f8c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class StringExpressionsITCase extends MultipleProgramsTestBase {
+
+
+       public StringExpressionsITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testSubstring() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+                               new Tuple2<>("AAAA", 2),
+                               new Tuple2<>("BBBB", 1));
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               Table result = in
+                               .select("a.substring(0, b)");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "AA\nB";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testSubstringWithMaxEnd() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+                               new Tuple2<>("ABCD", 2),
+                               new Tuple2<>("ABCD", 1));
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               Table result = in
+                               .select("a.substring(b)");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "CD\nBCD";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNonWorkingSubstring1() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple2<String, Float>> ds = env.fromElements(
+                               new Tuple2<>("ABCD", 2.0f),
+                               new Tuple2<>("ABCD", 1.0f));
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               Table result = in
+                               .select("a.substring(0, b)");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNonWorkingSubstring2() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple2<String, String>> ds = env.fromElements(
+                               new Tuple2<>("ABCD", "a"),
+                               new Tuple2<>("ABCD", "b"));
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               Table result = in
+                               .select("a.substring(b, 15)");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
new file mode 100644
index 0000000..7fd3a28
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnionITCase extends MultipleProgramsTestBase {
+
+
+       public UnionITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testUnion() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
+
+               Table selected = in1.unionAll(in2).select("c");
+               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+               List<Row> results = ds.collect();
+
+               String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" 
+ "Hello\n" + "Hello world\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testUnionWithFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
+
+               Table selected = in1.unionAll(in2).where("b < 2").select("c");
+               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+               List<Row> results = ds.collect();
+
+               String expected = "Hi\n" + "Hallo\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testUnionFieldsNameNotOverlap1() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+               Table selected = in1.unionAll(in2);
+               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+               List<Row> results = ds.collect();
+
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testUnionFieldsNameNotOverlap2() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, 
e").select("a, b, c");
+
+               Table selected = in1.unionAll(in2);
+               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+               List<Row> results = ds.collect();
+
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testUnionWithAggregation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
+
+               Table selected = in1.unionAll(in2).select("c.count");
+               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+               List<Row> results = ds.collect();
+
+               String expected = "18";
+               compareResultAsText(results, expected);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
new file mode 100644
index 0000000..1816614
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.table.test;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.scala.PageRankTable;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class PageRankTableITCase extends JavaProgramTestBase {
+
+       private static int NUM_PROGRAMS = 2;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+
+       private String verticesPath;
+       private String edgesPath;
+       private String resultPath;
+       private String expectedResult;
+
+       public PageRankTableITCase(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+               verticesPath = createTempFile("vertices.txt", 
PageRankData.VERTICES);
+               edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = runProgram(curProgId);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 
0.01);
+       }
+
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+
+               return toParameterList(tConfigs);
+       }
+
+
+       public String runProgram(int progId) throws Exception {
+
+               switch(progId) {
+               case 1: {
+                       PageRankTable.main(new String[]{verticesPath, 
edgesPath, resultPath, PageRankData
+                                       .NUM_VERTICES + "", "3"});
+                       return PageRankData.RANKS_AFTER_3_ITERATIONS;
+               }
+               case 2: {
+                       // start with a very high number of iteration such that 
the dynamic convergence criterion must handle termination
+                       PageRankTable.main(new String[] {verticesPath, 
edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+                       return 
PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+               }
+
+               default:
+                       throw new IllegalArgumentException("Invalid program 
id");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
new file mode 100644
index 0000000..acb7ded
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.ExpressionException
+import org.junit.Test
+
+class TypeExceptionTest {
+
+  @Test(expected = classOf[ExpressionException])
+  def testInnerCaseClassException(): Unit = {
+    case class WC(word: String, count: Int)
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable // this should fail
+    val result = expr
+      .groupBy('word)
+      .select('word, 'count.sum as 'count)
+      .toDataSet[WC]
+
+    result.print()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
new file mode 100644
index 0000000..ee5d9e8
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row]
+    val results = ds.collect()
+    val expected = "231,1,21,21,11"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAggregationOnNonExistingField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+      .select('foo.avg).toDataSet[Row]
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+      .toDataSet[Row]
+    val expected = "1,1,1,1,1.5,1.5,2"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAggregationWithArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row]
+    val expected = "5.5,2 THE COUNT"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+      .select('_1.count, '_2.count).toDataSet[Row]
+    val expected = "2,2"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toTable
+      .select('_1.sum).toDataSet[Row]
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNoNestedAggregations(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toTable
+      .select('_2.sum.sum).toDataSet[Row]
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
new file mode 100644
index 0000000..59573eb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) 
{
+
+  @Test
+  def testAs(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'c).toDataSet[Row]
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 
'd).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference1(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 
'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference2(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 
'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
new file mode 100644
index 0000000..c177184
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import java.util.Date
+
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testAutoCastToString(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new 
Date(0))).toTable
+      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + 
"d", '_7 + "Date")
+      .toDataSet[Row]
+    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNumericAutoCastInArithmetic(): Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+      .toDataSet[Row]
+    val expected = "2,2,2,2.0,2.0,2.0"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNumericAutoCastInComparison(): Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
+      .toDataSet[Row]
+    val expected = "2,2,2,2,2.0,2.0"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCastFromString: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("1", "true", "2.0",
+        "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
+      .toTable
+      .select(
+        '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.INT_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
+        '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
+        '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
+        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
+        
'_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
+    .toDataSet[Row]
+    val expected = "1,1,1,1,2.0,2.0,true," +
+      "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 
15:51:36.000," +
+      "1970-01-17 17:47:53.775\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCastDateToStringAndLong {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
+    val result = ds.toTable
+      .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
+        '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
+      .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),
+        'f0.cast(BasicTypeInfo.LONG_TYPE_INFO),
+        'f1.cast(BasicTypeInfo.STRING_TYPE_INFO),
+        'f1.cast(BasicTypeInfo.LONG_TYPE_INFO))
+      .toDataSet[Row]
+      .collect
+    val expected = "2011-05-03 15:51:36.000,1304437896000," +
+      "2011-05-03 15:51:36.000,1304437896000\n"
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
new file mode 100644
index 0000000..017cbf1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import java.util.Date
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 10)).as('a, 'b)
+      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row]
+    val expected = "0,10,2,10,1,-5"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testLogic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, true)).as('a, 'b)
+      .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row]
+    val expected = "true,false,true,false"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testComparisons(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 
'a.isNotNull).toDataSet[Row]
+    val expected = "true,true,false,false,true"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testBitwiseOperations(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testBitwiseWithAutocast(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testBitwiseWithNonWorkingAutocast(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.0, 5)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] 
+    val expected = "1,7,6,-4"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCaseInsensitiveForAs(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
+      .groupBy("a").select("a, a.count As cnt").toDataSet[Row]
+    val expected = "3,1"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDateLiteral(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((0L, "test")).as('a, 'b)
+      .select('a,
+        Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
+      .toDataSet[Row]
+    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
new file mode 100644
index 0000000..c0b86f8
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val expected = "\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnStringTupleField(): Unit = {
+    /*
+     * Test filter on String tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( _._3.contains("world") )
+    val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  // These two not yet done, but are planned
+
+  @Ignore
+  @Test
+  def testFilterBasicType(): Unit = {
+    /*
+     * Test filter on basic type
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+
+    val filterDs = ds.filter( _.startsWith("H") )
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how 
are you?\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Ignore
+  @Test
+  def testFilterOnCustomType(): Unit = {
+    /*
+     * Test filter on custom type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val filterDs = ds.filter( _.myString.contains("a") )
+    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
new file mode 100644
index 0000000..fb76507
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class GroupedAggreagationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test(expected = classOf[ExpressionException])
+  def testGroupingOnNonExistentField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('_foo)
+      .select('a.avg).toDataSet[Row]
+    val expected = ""
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum).toDataSet[Row]
+    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + 
"6,111\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum).toDataSet[Row]
+    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSQLStyleAggregations(): Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .select(
+        """Sum( a) as a1, a.sum as a2,
+          |Min (a) as b1, a.min as b2,
+          |Max (a ) as c1, a.max as c2,
+          |Avg ( a ) as d1, a.avg as d2,
+          |Count(a) as e1, a.count as e2
+        """.stripMargin).toDataSet[Row]
+    val expected = "231,231,1,1,21,21,11,11,21,21"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupNoAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+      .toDataSet[Row]
+
+    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
new file mode 100644
index 0000000..e12c9d6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testJoin(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g).toDataSet[Row]
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo 
Welt\n"
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 
'g).toDataSet[Row]
+    val expected = "Hi,Hallo\n"
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 
'g).toDataSet[Row]
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo 
Welt wie gehts?\n" +
+      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinNonExistingKey(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g).toDataSet[Row]
+    val expected = ""
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g).toDataSet[Row]
+    val expected = ""
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinWithAmbiguousFields(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
+
+    val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g).toDataSet[Row]
+    val expected = ""
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithAggregation(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count).toDataSet[Row]
+    val expected = "6"
+    val results = joinDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
new file mode 100644
index 0000000..fa3f283
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, 
'_3)
+      .toDataSet[Row]
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'c).select('a, 'b, 'c)
+      .toDataSet[Row]
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+      .select('_1 as 'a, '_2 as 'b)
+      .select('a, 'b).toDataSet[Row]
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + 
"7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" 
+ "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 
'd).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'b).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test(expected = classOf[ExpressionException])
+  def testOnlyFieldRefInAs(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 
'd).toDataSet[Row]
+    val expected = "no"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

Reply via email to