http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala deleted file mode 100644 index 4a358bc..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAggregationTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "231,1,21,21,11" - } - - @Test(expected = classOf[ExpressionException]) - def testAggregationOnNonExistingField: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('foo.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testWorkingAggregationDataTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression - .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,1,1,1.5,1.5,2" - } - - @Test - def testAggregationWithArithmetic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression - .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT") - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "5.5,2 THE COUNT" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingAggregationDataTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toExpression - .select('_1.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test(expected = classOf[ExpressionException]) - def testNoNestedAggregations: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toExpression - .select('_2.sum.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala deleted file mode 100644 index 18d7b09..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAs: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference1: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference2: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala deleted file mode 100644 index 599ef6b..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAutoCastToString: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression - .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d") - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1b,1s,1i,1L,1.0f,1.0d" - } - - @Test - def testNumericAutoCastInArithmetic: Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression - .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "2,2,2,2.0,2.0,2.0" - } - - @Test - def testNumericAutoCastInComparison: Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f) - .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "2,2,2,2,2.0,2.0" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala deleted file mode 100644 index 9d37f70..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testArithmetic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 10)).as('a, 'b) - .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "0,10,2,10,1,-5" - } - - @Test - def testLogic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, true)).as('a, 'b) - .select('b && true, 'b && false, 'b || false, !'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "true,false,true,false" - } - - @Test - def testComparisons: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c) - .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "true,true,false,false,true" - } - - @Test - def testBitwiseOperations: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - @Test - def testBitwiseWithAutocast: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - @Test(expected = classOf[ExpressionException]) - def testBitwiseWithNonWorkingAutocast: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.0, 5)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala deleted file mode 100644 index 2841534..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.tree.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - - -@RunWith(classOf[Parameterized]) -class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = null - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAllRejectingFilter: Unit = { - /* - * Test all-rejecting filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(false) ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "\n" - } - - @Test - def testAllPassingFilter: Unit = { - /* - * Test all-passing filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(true) ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testFilterOnStringTupleField: Unit = { - /* - * Test filter on String tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) - val filterDs = ds.filter( _._3.contains("world") ) - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" - } - - @Test - def testFilterOnIntegerTupleField: Unit = { - /* - * Test filter on Integer tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 === 0 ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" - } - - // These two not yet done, but are planned - - @Ignore - @Test - def testFilterBasicType: Unit = { - /* - * Test filter on basic type - */ - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getStringDataSet(env) - - val filterDs = ds.filter( _.startsWith("H") ) - - filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" - } - - @Ignore - @Test - def testFilterOnCustomType: Unit = { - /* - * Test filter on custom type - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getCustomTypeDataSet(env) - val filterDs = ds.filter( _.myString.contains("a") ) - filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala deleted file mode 100644 index f2e0286..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test(expected = classOf[ExpressionException]) - def testGroupingOnNonExistentField: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('_foo) - .select('a.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testGroupedAggregate: Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" - } - - @Test - def testGroupingKeyForwardIfNotUsed: Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('a.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala deleted file mode 100644 index 91b3b19..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testJoin: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" - } - - @Test - def testJoinWithFilter: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" - } - - @Test - def testJoinWithMultipleKeys: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" - } - - @Test(expected = classOf[ExpressionException]) - def testJoinNonExistingKey: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test(expected = classOf[ExpressionException]) - def testJoinWithNonMatchingKeyTypes: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test(expected = classOf[ExpressionException]) - def testJoinWithAmbiguousFields: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c) - - val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testJoinWithAggregation: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'd).select('g.count) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "6" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala deleted file mode 100644 index a799b60..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testSimpleSelectAll: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testSimpleSelectAllWithAs: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testSimpleSelectWithNaming: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('_1 as 'a, '_2 as 'b) - .select('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - - @Test(expected = classOf[ExpressionException]) - def testOnlyFieldRefInAs: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala deleted file mode 100644 index c6c1113..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions.test - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testSubstring: Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) - .select('a.substring(0, 'b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AA\nB" - } - - @Test - def testSubstringWithMaxEnd: Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) - .select('a.substring('b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "CD\nBCD" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring1: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) - .select('a.substring(0, 'b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AAA\nBB" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring2: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) - .select('a.substring('b, 15)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AAA\nBB" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3db6531..c869a18 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -57,6 +57,11 @@ class DataStream[T](javaStream: JavaStream[T]) { def getJavaStream: JavaStream[T] = javaStream /** + * Returns the TypeInformation for the elements of this DataStream. + */ + def getType(): TypeInformation[T] = javaStream.getType + + /** * Sets the parallelism of this operation. This must be greater than 1. */ def setParallelism(parallelism: Int): DataStream[T] = { http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml new file mode 100644 index 0000000..dcdbe83 --- /dev/null +++ b/flink-staging/flink-table/pom.xml @@ -0,0 +1,246 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-table</artifactId> + <name>flink-table</name> + + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala-examples</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins combine.children="append"> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + </plugins> + </build> + + <profiles> + <profile> + <id>scala-2.10</id> + <activation> + <property> + <!-- this is the default scala profile --> + <name>!scala-2.11</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.scalamacros</groupId> + <artifactId>quasiquotes_${scala.binary.version}</artifactId> + <version>${scala.macros.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java new file mode 100644 index 0000000..6f9f0a3 --- /dev/null +++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <strong>Table API (Java)</strong><br> + * + * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to create a + * {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet} + * or {@link org.apache.flink.streaming.api.datastream.DataStream}. + * + * <p> + * This can be used to perform SQL-like queries on data. Please have + * a look at {@link org.apache.flink.api.table.Table} to see which operations are supported and + * how query strings are written. + * + * <p> + * Example: + * + * <code> + * ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + * + * DataSet<WC> input = env.fromElements( + * new WC("Hello", 1), + * new WC("Ciao", 1), + * new WC("Hello", 1)); + * + * Table table = TableUtil.from(input); + * + * Table filtered = table + * .groupBy("word") + * .select("word.count as count, word") + * .filter("count = 2"); + * + * DataSet<WC> result = TableUtil.toSet(filtered, WC.class); + * + * result.print(); + * env.execute(); + * </code> + * + * <p> + * As seen above, a {@link org.apache.flink.api.table.Table} can be converted back to the + * underlying API representation using {@link org.apache.flink.api.java.table.TableEnvironment.toSet} + * or {@link org.apache.flink.api.java.table.TableEnvironment.toStream}. + */ +package org.apache.flink.api.java.table; http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java new file mode 100644 index 0000000..d7fbc8e --- /dev/null +++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <strong>Table API</strong><br> + * + * This package contains the generic part of the Table API. It can be used with Flink Streaming + * and Flink Batch. From Scala as well as from Java. + * + * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from + * a DataSet or DataStream. On this relational operations can be performed. A table can also + * be converted back to a DataSet or DataStream. + * + * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain + * the language specific part of the API. Refer to these packages for documentation on how + * the Table API can be used in Java and Scala. + */ +package org.apache.flink.api.table; http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java new file mode 100644 index 0000000..1c1fdca --- /dev/null +++ b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.java; + + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; + +/** + * Very simple example that shows how the Java Table API can be used. + */ +public class JavaTableExample { + + public static class WC { + public String word; + public int count; + + // Public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, int count) { + this.word = word; + this.count = count; + } + + @Override + public String toString() { + return "WC " + word + " " + count; + } + } + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + Table table = tableEnv.toTable(input); + + Table filtered = table + .groupBy("word") + .select("word.count as count, word") + .filter("count = 2"); + + DataSet<WC> result = tableEnv.toSet(filtered, WC.class); + + result.print(); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala new file mode 100644 index 0000000..17e4823 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.aggregation.AggregationFunction +import org.apache.flink.api.java.operators.JoinOperator.EquiJoin +import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} +import org.apache.flink.api.java.{DataSet => JavaDataSet} +import org.apache.flink.api.table.analysis.ExtractEquiJoinFields +import org.apache.flink.api.table.operations._ +import org.apache.flink.api.table.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction} +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} +import org.apache.flink.api.table.{ExpressionException, Row, Table} + +/** + * [[TableTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and + * translating them back to Java [[org.apache.flink.api.java.DataSet]]s. + */ +class JavaBatchTranslator extends TableTranslator { + + type Representation[A] = JavaDataSet[A] + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { + + val rowDataSet = createSelect(expressions, repr, inputType) + + Table(Root(rowDataSet, resultFields), this) + } + + override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[JavaDataSet[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataSet of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "A Table can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same" + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + val operator = new MapOperator(resultSet, outputType, function, opName) + + operator + } + + private def translateInternal(op: Operation): JavaDataSet[Row] = { + op match { + case Root(dataSet: JavaDataSet[Row], resultFields) => + dataSet + + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op) + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + + case As(input, newNames) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) + new RenameOperator(translatedInput, proxyType) + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + // no expansions took place + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val keyIndices = groupExpressions map { + case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) + case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") + } + + val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) + + val grouping = new UnsortedGrouping(translatedInput, keys) + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + grouping, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + case agg@Aggregate(input, aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + translatedInput, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: JavaDataSet[I], + inputType: CompositeType[I]): JavaDataSet[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + val operator = new MapOperator(input, resultType, function, opName) + + operator + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: JavaDataSet[L], + rightInput: JavaDataSet[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): JavaDataSet[Row] = { + + val resultType = new RowTypeInfo(fields) + + val (reducedPredicate, leftFields, rightFields) = + ExtractEquiJoinFields(leftType, rightType, predicate) + + if (leftFields.isEmpty || rightFields.isEmpty) { + throw new ExpressionException("Could not derive equi-join predicates " + + "for predicate " + predicate + ".") + } + + val leftKey = new ExpressionKeys[L](leftFields, leftType) + val rightKey = new ExpressionKeys[R](rightFields, rightType) + + val joiner = new ExpressionJoinFunction[L, R, Row]( + reducedPredicate, + leftType, + rightType, + resultType, + fields) + + new EquiJoin[L, R, Row]( + leftInput, + rightInput, + leftKey, + rightKey, + joiner, + resultType, + joinHint, + predicate.toString) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala new file mode 100644 index 0000000..4fd79e4 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.operations._ +import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{ExpressionException, Row, Table} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.invokable.operator.MapInvokable + +/** + * [[TableTranslator]] for creating [[Table]]s from Java [[DataStream]]s and + * translating them back to Java [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ + +class JavaStreamingTranslator extends TableTranslator { + + type Representation[A] = DataStream[A] + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { + + val rowDataStream = createSelect(expressions, repr, inputType) + + new Table(Root(rowDataStream, resultFields), this) + } + + override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[DataStream[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataStream of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "A Table can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same" + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + + resultSet.transform(opName, outputType, new MapInvokable[Row, A](function)) + } + + private def translateInternal(op: Operation): DataStream[Row] = { + op match { + case Root(dataSet: DataStream[Row], resultFields) => + dataSet + + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op) + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + + case As(input, newNames) => + throw new ExpressionException("As operation for Streams not yet implemented.") + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + // no expansions took place + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case agg@Aggregate(input, aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: DataStream[I], + inputType: CompositeType[I]): DataStream[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + + input.transform(opName, resultType, new MapInvokable[I, Row](function)) + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: DataStream[L], + rightInput: DataStream[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): DataStream[Row] = { + + throw new ExpressionException("Join operation for Streams not yet implemented.") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala new file mode 100644 index 0000000..030fa12 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.table + +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.Table +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Environment for working with the Table API. + * + * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] and back again. You + * can also use the provided methods to create a [[Table]] directly from a data source. + */ +class TableEnvironment { + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * tableEnv.toTable(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def toTable[T](set: DataSet[T], fields: String): Table[JavaBatchTranslator] = { + new JavaBatchTranslator().createTable(set, fields).asInstanceOf[Table[JavaBatchTranslator]] + } + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are used to name the + * [[org.apache.flink.api.table.Table]] fields. + */ + def toTable[T](set: DataSet[T]): Table[JavaBatchTranslator] = { + new JavaBatchTranslator().createTable(set).asInstanceOf[Table[JavaBatchTranslator]] + } + + /** + * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * tableEnv.toTable(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def toTable[T](set: DataStream[T], fields: String): Table[JavaStreamingTranslator] = { + new JavaStreamingTranslator().createTable(set, fields) + .asInstanceOf[Table[JavaStreamingTranslator]] + } + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are used to name the + * [[org.apache.flink.api.table.Table]] fields. + */ + def toTable[T](set: DataStream[T]): Table[JavaStreamingTranslator] = { + new JavaStreamingTranslator().createTable(set).asInstanceOf[Table[JavaStreamingTranslator]] + } + + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataSet. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toSet[T]( + table: Table[JavaBatchTranslator], + clazz: Class[T]): DataSet[T] = { + table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]] + } + + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataStream. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toStream[T]( + table: Table[JavaStreamingTranslator], + clazz: Class[T]): DataStream[T] = { + table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]] + } +} +