http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java new file mode 100644 index 0000000..428aec5 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class JoinITCase extends MultipleProgramsTestBase { + + + public JoinITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testJoin() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1.join(in2).where("b === e").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; + compareResultAsText(results, expected); + } + + @Test + public void testJoinWithFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1.join(in2).where("b === e && b < 2").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n"; + compareResultAsText(results, expected); + } + + @Test + public void testJoinWithMultipleKeys() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1.join(in2).where("a === d && b === h").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testJoinNonExistingKey() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1.join(in2).where("foo === e").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testJoinWithNonMatchingKeyTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1 + .join(in2).where("a === g").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testJoinWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c"); + + Table result = in1 + .join(in2).where("a === d").select("c, g"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test + public void testJoinWithAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1 + .join(in2).where("a === d").select("g.count"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "6"; + compareResultAsText(results, expected); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java new file mode 100644 index 0000000..d61912b --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.io.Serializable; +import java.util.List; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PojoGroupingITCase extends MultipleProgramsTestBase { + + public PojoGroupingITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testPojoGrouping() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Double, String>> data = env.fromElements( + new Tuple3<>("A", 23.0, "Z"), + new Tuple3<>("A", 24.0, "Y"), + new Tuple3<>("B", 1.0, "Z")); + + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = tableEnv + .fromDataSet(data, "groupMe, value, name") + .select("groupMe, value, name") + .where("groupMe != 'B'"); + + DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); + + DataSet<MyPojo> result = myPojos.groupBy("groupMe") + .sortGroup("value", Order.DESCENDING) + .first(1); + List<MyPojo> resultList = result.collect(); + + compareResultAsText(resultList, "A,24.0,Y"); + } + + public static class MyPojo implements Serializable { + private static final long serialVersionUID = 8741918940120107213L; + + public String groupMe; + public double value; + public String name; + + public MyPojo() { + // for serialization + } + + public MyPojo(String groupMe, double value, String name) { + this.groupMe = groupMe; + this.value = value; + this.name = name; + } + + @Override + public String toString() { + return groupMe + "," + value + "," + name; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java new file mode 100644 index 0000000..9e42f53 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class SelectITCase extends MultipleProgramsTestBase { + + + public SelectITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testSimpleSelectAllWithAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds, "a,b,c"); + + Table result = in + .select("a, b, c"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + + } + + @Test + public void testSimpleSelectWithNaming() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds); + + Table result = in + .select("f0 as a, f1 as b") + .select("a, b"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); + List<Row> results = resultSet.collect(); + String expected = " sorry dude "; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds, "a, b, c, d"); + + DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); + List<Row> results = resultSet.collect(); + String expected = " sorry dude "; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds, "a, b, c, b"); + + DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); + List<Row> results = resultSet.collect(); + String expected = " today's not your day "; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testOnlyFieldRefInAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table in = tableEnv.fromDataSet(ds, "a, b as c, d"); + + DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); + List<Row> results = resultSet.collect(); + String expected = "sorry bro"; + compareResultAsText(results, expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java new file mode 100644 index 0000000..e73b5a2 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.table.Table; +import org.junit.Test; + +import java.io.File; +import java.util.Scanner; + +import static org.junit.Assert.assertEquals; + +public class SqlExplainITCase { + + private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile(); + + public static class WC { + public String word; + public int count; + + // Public constructor to make it a Flink POJO + public WC() {} + + public WC(int count, String word) { + this.word = word; + this.count = count; + } + } + + @Test + public void testGroupByWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC(1,"d"), + new WC(2,"d"), + new WC(3,"d")); + + Table table = tableEnv.fromDataSet(input).as("a, b"); + + String result = table + .filter("a % 2 = 0") + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testFilter0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testGroupByWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC(1, "d"), + new WC(2, "d"), + new WC(3, "d")); + + Table table = tableEnv.fromDataSet(input).as("a, b"); + + String result = table + .filter("a % 2 = 0") + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testFilter1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testJoinWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + + DataSet<WC> input2 = env.fromElements( + new WC(1,"d"), + new WC(1,"d"), + new WC(1,"d")); + + Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + + String result = table1 + .join(table2) + .where("b = d") + .select("a, c") + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testJoin0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testJoinWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + + String result = table1 + .join(table2) + .where("b = d") + .select("a, c") + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testJoin1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testUnionWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2); + + String result = table1 + .unionAll(table2) + .explain(); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testUnion0.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } + + @Test + public void testUnionWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input1 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table1 = tableEnv.fromDataSet(input1); + + DataSet<WC> input2 = env.fromElements( + new WC(1, "d"), + new WC(1, "d"), + new WC(1, "d")); + + Table table2 = tableEnv.fromDataSet(input2); + + String result = table1 + .unionAll(table2) + .explain(true); + String source = new Scanner(new File(testFilePath + + "../../src/test/scala/resources/testUnion1.out")) + .useDelimiter("\\A").next(); + assertEquals(result, source); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java new file mode 100644 index 0000000..7936f8c --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class StringExpressionsITCase extends MultipleProgramsTestBase { + + + public StringExpressionsITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testSubstring() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple2<String, Integer>> ds = env.fromElements( + new Tuple2<>("AAAA", 2), + new Tuple2<>("BBBB", 1)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a.substring(0, b)"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "AA\nB"; + compareResultAsText(results, expected); + } + + @Test + public void testSubstringWithMaxEnd() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple2<String, Integer>> ds = env.fromElements( + new Tuple2<>("ABCD", 2), + new Tuple2<>("ABCD", 1)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a.substring(b)"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "CD\nBCD"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingSubstring1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple2<String, Float>> ds = env.fromElements( + new Tuple2<>("ABCD", 2.0f), + new Tuple2<>("ABCD", 1.0f)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a.substring(0, b)"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingSubstring2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple2<String, String>> ds = env.fromElements( + new Tuple2<>("ABCD", "a"), + new Tuple2<>("ABCD", "b")); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a.substring(b, 15)"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = ""; + compareResultAsText(results, expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java new file mode 100644 index 0000000..7fd3a28 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class UnionITCase extends MultipleProgramsTestBase { + + + public UnionITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testUnion() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "a, b, c"); + + Table selected = in1.unionAll(in2).select("c"); + DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); + List<Row> results = ds.collect(); + + String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"; + compareResultAsText(results, expected); + } + + @Test + public void testUnionWithFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); + + Table selected = in1.unionAll(in2).where("b < 2").select("c"); + DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); + List<Row> results = ds.collect(); + + String expected = "Hi\n" + "Hallo\n"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testUnionFieldsNameNotOverlap1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table selected = in1.unionAll(in2); + DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); + List<Row> results = ds.collect(); + + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testUnionFieldsNameNotOverlap2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c"); + + Table selected = in1.unionAll(in2); + DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); + List<Row> results = ds.collect(); + + String expected = ""; + compareResultAsText(results, expected); + } + + @Test + public void testUnionWithAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); + + Table selected = in1.unionAll(in2).select("c.count"); + DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); + List<Row> results = ds.collect(); + + String expected = "18"; + compareResultAsText(results, expected); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java new file mode 100644 index 0000000..1816614 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java @@ -0,0 +1,100 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.api.scala.table.test; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.scala.PageRankTable; +import org.apache.flink.test.testdata.PageRankData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class PageRankTableITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 2; + + private int curProgId = config.getInteger("ProgramId", -1); + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + public PageRankTableITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); + edgesPath = createTempFile("edges.txt", PageRankData.EDGES); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = runProgram(curProgId); + } + + @Override + protected void postSubmit() throws Exception { + compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.01); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + + public String runProgram(int progId) throws Exception { + + switch(progId) { + case 1: { + PageRankTable.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData + .NUM_VERTICES + "", "3"}); + return PageRankData.RANKS_AFTER_3_ITERATIONS; + } + case 2: { + // start with a very high number of iteration such that the dynamic convergence criterion must handle termination + PageRankTable.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); + return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; + } + + default: + throw new IllegalArgumentException("Invalid program id"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala new file mode 100644 index 0000000..acb7ded --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala @@ -0,0 +1,42 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.ExpressionException +import org.junit.Test + +class TypeExceptionTest { + + @Test(expected = classOf[ExpressionException]) + def testInnerCaseClassException(): Unit = { + case class WC(word: String, count: Int) + + val env = ExecutionEnvironment.getExecutionEnvironment + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + val expr = input.toTable // this should fail + val result = expr + .groupBy('word) + .select('word, 'count.sum as 'count) + .toDataSet[WC] + + result.print() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala new file mode 100644 index 0000000..ee5d9e8 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testAggregationTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row] + val results = ds.collect() + val expected = "231,1,21,21,11" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAggregationOnNonExistingField(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + .select('foo.avg).toDataSet[Row] + val expected = "" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable + .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) + .toDataSet[Row] + val expected = "1,1,1,1,1.5,1.5,2" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAggregationWithArithmetic(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable + .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row] + val expected = "5.5,2 THE COUNT" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAggregationWithTwoCount(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable + .select('_1.count, '_2.count).toDataSet[Row] + val expected = "2,2" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("Hello", 1)).toTable + .select('_1.sum).toDataSet[Row] + val expected = "" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testNoNestedAggregations(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("Hello", 1)).toTable + .select('_2.sum.sum).toDataSet[Row] + val expected = "" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala new file mode 100644 index 0000000..59573eb --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testAs(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).toDataSet[Row] + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToFewFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToManyFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithAmbiguousFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithNonFieldReference1(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + // as can only have field references + val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithNonFieldReference2(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + // as can only have field references + val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala new file mode 100644 index 0000000..c177184 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import java.util.Date + +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testAutoCastToString(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable + .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date") + .toDataSet[Row] + val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNumericAutoCastInArithmetic(): Unit = { + + // don't test everything, just some common cast directions + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable + .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) + .toDataSet[Row] + val expected = "2,2,2,2.0,2.0,2.0" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNumericAutoCastInComparison(): Unit = { + + // don't test everything, just some common cast directions + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f) + .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) + .toDataSet[Row] + val expected = "2,2,2,2,2.0,2.0" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testCastFromString: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("1", "true", "2.0", + "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")) + .toTable + .select( + '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO), + '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO), + '_1.cast(BasicTypeInfo.INT_TYPE_INFO), + '_1.cast(BasicTypeInfo.LONG_TYPE_INFO), + '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO), + '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO), + '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO), + '_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) + .toDataSet[Row] + val expected = "1,1,1,1,2.0,2.0,true," + + "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + + "1970-01-17 17:47:53.775\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testCastDateToStringAndLong { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000")) + val result = ds.toTable + .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0), + '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1)) + .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO), + 'f0.cast(BasicTypeInfo.LONG_TYPE_INFO), + 'f1.cast(BasicTypeInfo.STRING_TYPE_INFO), + 'f1.cast(BasicTypeInfo.LONG_TYPE_INFO)) + .toDataSet[Row] + .collect + val expected = "2011-05-03 15:51:36.000,1304437896000," + + "2011-05-03 15:51:36.000,1304437896000\n" + TestBaseUtils.compareResultAsText(result.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala new file mode 100644 index 0000000..017cbf1 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import java.util.Date + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testArithmetic(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, 10)).as('a, 'b) + .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row] + val expected = "0,10,2,10,1,-5" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testLogic(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, true)).as('a, 'b) + .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row] + val expected = "true,false,true,false" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testComparisons(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c) + .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull).toDataSet[Row] + val expected = "true,true,false,false,true" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testBitwiseOperations(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] + val expected = "1,7,6,-4" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testBitwiseWithAutocast(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3, 5.toByte)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] + val expected = "1,7,6,-4" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testBitwiseWithNonWorkingAutocast(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3.0, 5)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] + val expected = "1,7,6,-4" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testCaseInsensitiveForAs(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3, 5.toByte)).as('a, 'b) + .groupBy("a").select("a, a.count As cnt").toDataSet[Row] + val expected = "3,1" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDateLiteral(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((0L, "test")).as('a, 'b) + .select('a, + Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO), + 'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) + .toDataSet[Row] + val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala new file mode 100644 index 0000000..c0b86f8 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + + +@RunWith(classOf[Parameterized]) +class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testAllRejectingFilter(): Unit = { + /* + * Test all-rejecting filter. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( Literal(false) ) + val expected = "\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAllPassingFilter(): Unit = { + /* + * Test all-passing filter. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( Literal(true) ) + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnStringTupleField(): Unit = { + /* + * Test filter on String tuple field. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env) + val filterDs = ds.filter( _._3.contains("world") ) + val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnIntegerTupleField(): Unit = { + /* + * Test filter on Integer tuple field. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 === 0 ) + val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + // These two not yet done, but are planned + + @Ignore + @Test + def testFilterBasicType(): Unit = { + /* + * Test filter on basic type + */ + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getStringDataSet(env) + + val filterDs = ds.filter( _.startsWith("H") ) + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Ignore + @Test + def testFilterOnCustomType(): Unit = { + /* + * Test filter on custom type + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getCustomTypeDataSet(env) + val filterDs = ds.filter( _.myString.contains("a") ) + val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala new file mode 100644 index 0000000..fb76507 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test(expected = classOf[ExpressionException]) + def testGroupingOnNonExistentField(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('_foo) + .select('a.avg).toDataSet[Row] + val expected = "" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregate(): Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('b) + .select('b, 'a.sum).toDataSet[Row] + val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupingKeyForwardIfNotUsed(): Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('b) + .select('a.sum).toDataSet[Row] + val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSQLStyleAggregations(): Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .select( + """Sum( a) as a1, a.sum as a2, + |Min (a) as b1, a.min as b2, + |Max (a ) as c1, a.max as c2, + |Avg ( a ) as d1, a.avg as d2, + |Count(a) as e1, a.count as e2 + """.stripMargin).toDataSet[Row] + val expected = "231,231,1,1,21,21,11,11,21,21" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupNoAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'c) + .groupBy('b) + .select('a.sum as 'd, 'b) + .groupBy('b, 'd) + .select('b) + .toDataSet[Row] + + val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala new file mode 100644 index 0000000..e12c9d6 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g).toDataSet[Row] + val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithFilter(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g).toDataSet[Row] + val expected = "Hi,Hallo\n" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithMultipleKeys(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g).toDataSet[Row] + val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testJoinNonExistingKey(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g).toDataSet[Row] + val expected = "" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testJoinWithNonMatchingKeyTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g).toDataSet[Row] + val expected = "" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testJoinWithAmbiguousFields(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c) + + val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g).toDataSet[Row] + val expected = "" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithAggregation(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('a === 'd).select('g.count).toDataSet[Row] + val expected = "6" + val results = joinDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala new file mode 100644 index 0000000..fa3f283 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testSimpleSelectAll(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3) + .toDataSet[Row] + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectAllWithAs(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) + .toDataSet[Row] + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectWithNaming(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + .select('_1 as 'a, '_2 as 'b) + .select('a, 'b).toDataSet[Row] + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToFewFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToManyFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithAmbiguousFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + + @Test(expected = classOf[ExpressionException]) + def testOnlyFieldRefInAs(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd).toDataSet[Row] + val expected = "no" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +}