http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala new file mode 100644 index 0000000..f2e0286 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test(expected = classOf[ExpressionException]) + def testGroupingOnNonExistentField: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('_foo) + .select('a.avg) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test + def testGroupedAggregate: Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('b) + .select('b, 'a.sum) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" + } + + @Test + def testGroupingKeyForwardIfNotUsed: Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .groupBy('b) + .select('a.sum) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala new file mode 100644 index 0000000..91b3b19 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testJoin: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" + } + + @Test + def testJoinWithFilter: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "Hi,Hallo\n" + } + + @Test + def testJoinWithMultipleKeys: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" + } + + @Test(expected = classOf[ExpressionException]) + def testJoinNonExistingKey: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test(expected = classOf[ExpressionException]) + def testJoinWithNonMatchingKeyTypes: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test(expected = classOf[ExpressionException]) + def testJoinWithAmbiguousFields: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c) + + val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test + def testJoinWithAggregation: Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinDs = ds1.join(ds2).where('a === 'd).select('g.count) + + joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "6" + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala new file mode 100644 index 0000000..a799b60 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testSimpleSelectAll: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + } + + @Test + def testSimpleSelectAllWithAs: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + } + + @Test + def testSimpleSelectWithNaming: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toExpression + .select('_1 as 'a, '_2 as 'b) + .select('a, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToFewFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToManyFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithAmbiguousFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + + @Test(expected = classOf[ExpressionException]) + def testOnlyFieldRefInAs: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala new file mode 100644 index 0000000..c6c1113 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testSubstring: Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) + .select('a.substring(0, 'b)) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "AA\nB" + } + + @Test + def testSubstringWithMaxEnd: Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) + .select('a.substring('b)) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "CD\nBCD" + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingSubstring1: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) + .select('a.substring(0, 'b)) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "AAA\nBB" + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingSubstring2: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) + .select('a.substring('b, 15)) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "AAA\nBB" + } + + +}