[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167678#comment-15167678 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1709 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167662#comment-15167662 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1709#issuecomment-188933506 You can close this now @twalthr. I've merged it. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167115#comment-15167115 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1679 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166930#comment-15166930 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1709 [FLINK-3226] Improvements for expected types I added some documentation and fixed some bugs when performing aggregations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink ExpectedTypeImprovements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1709.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1709 commit a8614d81d81e45c7a67b2d90018923980881dcb8 Author: twalthrDate: 2016-02-25T08:36:32Z [FLINK-3226] Improvements for expected types > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166925#comment-15166925 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1679#issuecomment-188669681 Thanks for the update. Looks good to merge > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166872#comment-15166872 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1679#issuecomment-188655499 If there are no objections, I would like to merge this later. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162879#comment-15162879 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1679#issuecomment-188219011 @fhueske build succeeded > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162692#comment-15162692 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53910352 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) { super(mode); } - @Test(expected = CodeGenException.class) --- End diff -- Yes, you are right, testing at least one function end-to-end makes sense. I will add them again. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160680#comment-15160680 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53906027 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) { super(mode); } - @Test(expected = CodeGenException.class) --- End diff -- Agreed, we should not have end-to-end tests for all scalar functions. But it would be good to tests at least some representative functions, IMO. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160358#comment-15160358 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53905333 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.test.utils.ExpressionEvaluator +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.junit.Assert.assertEquals +import org.junit.Test + +class ScalarFunctionsTest { + + @Test + def testSubstring(): Unit = { +testFunction( + 'f0.substring(2), + "f0.substring(2)", + "SUBSTRING(f0, 2)", + "his is a test String.") + +testFunction( + 'f0.substring(2, 5), + "f0.substring(2, 5)", + "SUBSTRING(f0, 2, 5)", + "his i") + +testFunction( + 'f0.substring(1, 'f7), + "f0.substring(1, f7)", + "SUBSTRING(f0, 1, f7)", + "Thi") + } + + // -- + + def testFunction( + expr: Expression, + exprString: String, + sqlExpr: String, + expected: String): Unit = { +val testData = new Row(8) +testData.setField(0, "This is a test String.") +testData.setField(1, true) +testData.setField(2, 42.toByte) +testData.setField(3, 43.toShort) +testData.setField(4, 44.toLong) +testData.setField(5, 4.5.toFloat) +testData.setField(6, 4.6) +testData.setField(7, 3) + +val typeInfo = new RowTypeInfo(Seq( + STRING_TYPE_INFO, + BOOLEAN_TYPE_INFO, + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO, + INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] + +val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) +assertEquals(expected, exprResult) + +val exprStringResult = ExpressionEvaluator.evaluate( + testData, + typeInfo, + ExpressionParser.parseExpression(exprString)) +assertEquals(expected, exprStringResult) + +// TODO test SQL expression --- End diff -- Once we have a SQL parser ready, I will resolve this TODO ;-) > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > -
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160353#comment-15160353 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53905238 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) { super(mode); } - @Test(expected = CodeGenException.class) --- End diff -- No, the `ScalarFunctionsTest` does not test end-to-end. I don't think this is necessary for scalar functions, because they don't affect optimization/plan translation. When we will have more functions (e.g. 40 of them), we would have 40 end-to-end tests just implementing a MapFunction and evaluating an expression. That's why I implemented the `ExpressionEvaluator`. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158711#comment-15158711 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1679#issuecomment-187648227 I had only a few minor comments. Looks mostly good. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158704#comment-15158704 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53763441 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala --- @@ -18,42 +18,20 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.table.{Row, ExpressionException} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized + import scala.collection.JavaConverters._ -import org.apache.flink.api.table.codegen.CodeGenException @RunWith(classOf[Parameterized]) class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[CodeGenException]) --- End diff -- Why did you remove these tests? The `ScalarFunctionsTest` does not test the feature end-to-end, right? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158705#comment-15158705 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53763449 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) { super(mode); } - @Test(expected = CodeGenException.class) --- End diff -- Why did you remove these tests? The `ScalarFunctionsTest` does not test the feature end-to-end, right? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158695#comment-15158695 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53763082 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.test.utils.ExpressionEvaluator +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.junit.Assert.assertEquals +import org.junit.Test + +class ScalarFunctionsTest { + + @Test + def testSubstring(): Unit = { +testFunction( + 'f0.substring(2), + "f0.substring(2)", + "SUBSTRING(f0, 2)", + "his is a test String.") + +testFunction( + 'f0.substring(2, 5), + "f0.substring(2, 5)", + "SUBSTRING(f0, 2, 5)", + "his i") + +testFunction( + 'f0.substring(1, 'f7), + "f0.substring(1, f7)", + "SUBSTRING(f0, 1, f7)", + "Thi") + } + + // -- + + def testFunction( + expr: Expression, + exprString: String, + sqlExpr: String, + expected: String): Unit = { +val testData = new Row(8) +testData.setField(0, "This is a test String.") +testData.setField(1, true) +testData.setField(2, 42.toByte) +testData.setField(3, 43.toShort) +testData.setField(4, 44.toLong) +testData.setField(5, 4.5.toFloat) +testData.setField(6, 4.6) +testData.setField(7, 3) + +val typeInfo = new RowTypeInfo(Seq( + STRING_TYPE_INFO, + BOOLEAN_TYPE_INFO, + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO, + INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] + +val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) +assertEquals(expected, exprResult) + +val exprStringResult = ExpressionEvaluator.evaluate( + testData, + typeInfo, + ExpressionParser.parseExpression(exprString)) +assertEquals(expected, exprStringResult) + +// TODO test SQL expression --- End diff -- Resolve TODO? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional:
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158570#comment-15158570 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1679#discussion_r53751120 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -702,12 +704,17 @@ class CodeGenerator( requireBoolean(operand) generateNot(nullCheck, operand) + // casting case CAST => val operand = operands.head generateCast(nullCheck, operand, resultType) - case call@_ => --- End diff -- Add the all case to the end to get a good error message. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155749#comment-15155749 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1679 [FLINK-3226] Translation of scalar function substring() This PR implements the scalar function `substring()` for the Table API on Calcite. Additionally, it already contains preparations for more built-in SQL functions. I implemented test utils for scalar functions `ScalarFunctionsTest` and added a similar concept to Calcites `CallImplementor` named `CallGenerator`. For now, I kept the `Substring` expression node, but I would like to remove it and use a generic `Call` expression. I think we don't need a case class for every operator. What do you think? You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink AdvancedOperators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1679 commit 0106e4723f38ab533bf15c909b98299e27530b2a Author: twalthrDate: 2016-02-20T20:41:44Z [FLINK-3226] Translation of scalar function substring() > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149121#comment-15149121 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1639 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149123#comment-15149123 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1634 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148547#comment-15148547 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1634#issuecomment-184672115 Merged, please close @twalthr > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148282#comment-15148282 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1634#issuecomment-184581189 Thanks for the update. LGTM, merging this > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148281#comment-15148281 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1639#issuecomment-184581139 Merging this > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147831#comment-15147831 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1634#issuecomment-184412261 @fhueske you were totally right. I reworked the casting again. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147650#comment-15147650 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1639#issuecomment-184326419 Looks good. :-) +1 to merge. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147622#comment-15147622 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1634#discussion_r52925323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala --- @@ -18,20 +18,56 @@ package org.apache.flink.api.table.codegen import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.codegen.CodeGenUtils._ object OperatorCodeGen { - def generateArithmeticOperator( + def generateArithmeticOperator( operator: String, nullCheck: Boolean, resultType: TypeInformation[_], left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { -generateOperatorIfNotNull(nullCheck, resultType, left, right) { +// String arithmetic // TODO rework +if (isString(left)) { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } +} +// Numeric arithmetic +else if (isNumeric(left) && isNumeric(right)) { + val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] + val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] + + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => +// insert auto casting for "narrowing primitive conversions" +if (leftType != rightType) { + // leftType can not be casted to rightType automatically -> narrow + if (!leftType.shouldAutocastTo(rightType)) { --- End diff -- Just checked, double is actually downcasted. The following test fails: ``` val t = env.fromElements((10.0d: Double, 1: Byte)).toTable.select('_1 + '_2) val expected = "11.0" ``` > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147608#comment-15147608 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1634#discussion_r52924026 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala --- @@ -18,20 +18,56 @@ package org.apache.flink.api.table.codegen import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.codegen.CodeGenUtils._ object OperatorCodeGen { - def generateArithmeticOperator( + def generateArithmeticOperator( operator: String, nullCheck: Boolean, resultType: TypeInformation[_], left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { -generateOperatorIfNotNull(nullCheck, resultType, left, right) { +// String arithmetic // TODO rework +if (isString(left)) { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } +} +// Numeric arithmetic +else if (isNumeric(left) && isNumeric(right)) { + val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] + val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] + + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => +// insert auto casting for "narrowing primitive conversions" +if (leftType != rightType) { + // leftType can not be casted to rightType automatically -> narrow + if (!leftType.shouldAutocastTo(rightType)) { --- End diff -- Wouldn't this downcast `double` to `short` given an expression: `double + short`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147585#comment-15147585 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1634#discussion_r52921997 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala --- @@ -18,20 +18,56 @@ package org.apache.flink.api.table.codegen import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.codegen.CodeGenUtils._ object OperatorCodeGen { - def generateArithmeticOperator( + def generateArithmeticOperator( operator: String, nullCheck: Boolean, resultType: TypeInformation[_], left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { -generateOperatorIfNotNull(nullCheck, resultType, left, right) { +// String arithmetic // TODO rework +if (isString(left)) { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } +} +// Numeric arithmetic +else if (isNumeric(left) && isNumeric(right)) { + val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] + val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] + + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => +// insert auto casting for "narrowing primitive conversions" +if (leftType != rightType) { --- End diff -- Shouldn't we compare and cast to the result type? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147548#comment-15147548 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1624 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147541#comment-15147541 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-184278657 You can close the PR @twalthr. It's merged (but doesn't automatically close). > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147533#comment-15147533 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1639 [FLINK-3226] Translation of explicit casting This PR implements explicit casting from and to all supported types so far. @fhueske @vasia can someone review? You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink ExplicitCasting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1639 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147343#comment-15147343 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia closed the pull request at: https://github.com/apache/flink/pull/1632 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147225#comment-15147225 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1632#issuecomment-184170164 Great, merging this also. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147224#comment-15147224 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-184170079 Thanks @twalthr. I'll merge this. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147063#comment-15147063 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1632#issuecomment-184113402 Thanks for the changes. Looks good to merge ;-) > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15146694#comment-15146694 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1632#issuecomment-183957839 Thanks for the review @twalthr! I've addressed your comments. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145919#comment-15145919 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-183640190 Thanks for reviewing @fhueske! I updated my PR and rebased it. Feel free to merge ;-) > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145937#comment-15145937 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1634 [FLINK-3226] Casting support for arithmetic operators This PR implements auto-casting for numeric arithmetic operators. @fhueske @vasia mergable? You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink ArithmeticCasting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1634.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1634 commit fd783341b93d7a7ffba11c827a317adb35248ea0 Author: twalthrDate: 2016-02-13T11:38:12Z [FLINK-3226] Casting support for arithmetic operators > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145938#comment-15145938 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1632#discussion_r52826898 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -148,16 +148,24 @@ class CodeGenerator( if (clazz == classOf[FlatMapFunction[_,_]]) { val inputTypeTerm = boxedTypeTermForTypeInfo(input1) (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", - s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } // MapFunction else if (clazz == classOf[MapFunction[_,_]]) { val inputTypeTerm = boxedTypeTermForTypeInfo(input1) ("Object map(Object _in1)", - s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } + // FlatJoinFunction + else if (clazz == classOf[FlatJoinFunction[_,_,_]]) { +val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) +val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.get) //TODO check this --- End diff -- Why don't you call `getOrElse` here and throw an CodeGenException if input2 is not present? Then we can get rid of the TODO. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145945#comment-15145945 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1632#discussion_r52826951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala --- @@ -63,7 +63,7 @@ class DataSetMap( config: TableConfig, expectedType: Option[TypeInformation[Any]]) : DataSet[Any] = { -val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config) +val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) --- End diff -- You cannot forward the expected type, since you don't know what the previous operator does. E.g. if the expected type is `Tuple2` but the previous operator outputs records with one field. Why did you change this call? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145947#comment-15145947 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1632#discussion_r52827003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala --- @@ -39,18 +49,80 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) -new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.toString, - Array[Int](), - Array[Int](), - JoinType.INNER, - null, - null) +// get the equality keys +val joinInfo = join.analyzeCondition +val keyPairs = joinInfo.pairs + +if (keyPairs.isEmpty) { // if no equality keys => not supported + throw new TableException("Joins should have at least one equality condition") +} +else { // at least one equality expression => generate a join function + val conditionType = join.getCondition.getType + val func = getJoinFunction(join, joinInfo) + val leftKeys = ArrayBuffer.empty[Int] + val rightKeys = ArrayBuffer.empty[Int] + + keyPairs.foreach(pair => { +leftKeys.add(pair.source) +rightKeys.add(pair.target)} + ) + + new DataSetJoin( +rel.getCluster, +traitSet, +convLeft, +convRight, +rel.getRowType, +join.toString, +leftKeys.toArray, +rightKeys.toArray, +JoinType.INNER, +null, +func) +} + } + + def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo): + ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => + FlatJoinFunction[Any, Any, Any]) = { + +if (joinInfo.isEqui) { + // only equality condition => no join function necessary + null --- End diff -- In general, `null` is not very welcome in Scala. Could you return a `FlatJoinFunction` (containing only a `ConverterResultExpression`) here too? We can then get rid of the `Tuple2RowMapper` and support any type as output type of the join operation. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145951#comment-15145951 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1632#discussion_r52827029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala --- @@ -39,18 +49,80 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) -new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.toString, - Array[Int](), - Array[Int](), - JoinType.INNER, - null, - null) +// get the equality keys +val joinInfo = join.analyzeCondition +val keyPairs = joinInfo.pairs + +if (keyPairs.isEmpty) { // if no equality keys => not supported + throw new TableException("Joins should have at least one equality condition") +} +else { // at least one equality expression => generate a join function + val conditionType = join.getCondition.getType + val func = getJoinFunction(join, joinInfo) + val leftKeys = ArrayBuffer.empty[Int] + val rightKeys = ArrayBuffer.empty[Int] + + keyPairs.foreach(pair => { +leftKeys.add(pair.source) +rightKeys.add(pair.target)} + ) + + new DataSetJoin( +rel.getCluster, +traitSet, +convLeft, +convRight, +rel.getRowType, +join.toString, +leftKeys.toArray, +rightKeys.toArray, +JoinType.INNER, +null, +func) +} + } + + def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo): + ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => + FlatJoinFunction[Any, Any, Any]) = { + +if (joinInfo.isEqui) { + // only equality condition => no join function necessary + null +} +else { + val func = ( +config: TableConfig, +leftInputType: TypeInformation[Any], +rightInputType: TypeInformation[Any], +returnType: TypeInformation[Any]) => { + + val generator = new CodeGenerator(config, leftInputType, Some(rightInputType)) + val condition = generator.generateExpression(join.getCondition) + val body = { --- End diff -- Unnecessary brackets? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144312#comment-15144312 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-183251312 Hi, I thought about using POJOs as native types within Table/SQL operators. IMO, the gains are very little compared to the added code complexity. Given a POJO input, we can preserve the input type only for very few operations such as a Filter. For most other operations, we need to generate a new output type (Tuple or Row). I am a bit skeptical about adding a lot of codeGen code with special cases for POJOs (such as the field index mapping) which is very seldom used. Moreover, POJO field accesses (for operations and serialization) go through reflection and are not very efficient. So even the performance gain for those few cases where POJOs can be used is not clear. I do not question the native type support in general. Tuples and primitives should definitely be supported, but I don't think we need to support POJOs within Table / SQL operators. Instead, I would convert POJO datasets into Row tables during table scan. Most of the code in this PR can be used to implement a codeGen'd converter Map function. What do you think @twalthr? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144480#comment-15144480 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-183287252 Ah, OK :-) I have to admit that I didn't look at the changes in detail but noticed that they go into several places of the code gen. Hence, I was wondering if it would be possible to extract the special POJO case from the `CodeGenerator`, but I guess that would lead to code duplication. I'll start reviewing the changes. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144489#comment-15144489 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52731697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -75,5 +75,15 @@ class TableEnvironment { TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) } + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataSet. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the --- End diff -- Ah, I see the comment is copied. But should be changed, no? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144558#comment-15144558 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52739054 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -150,7 +151,11 @@ object CodeGenUtils { sealed abstract class FieldAccessor - case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor + case class ObjectFieldAccessor(field: Field) extends FieldAccessor + + case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor + + case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor --- End diff -- I think we do not have to separate the cases of public and private fields and make the fields always accessible. This would reduce a bit of code complexity. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144563#comment-15144563 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52739359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -42,13 +42,30 @@ import scala.collection.mutable * @param config configuration that determines runtime behavior * @param input1 type information about the first input of the Function * @param input2 type information about the second input if the Function is binary + * @param inputPojoFieldMapping additional mapping information if input1 is a POJO (POJO types + * have no deterministic field order). We assume that input2 is + * converted before and thus is never a POJO. */ class CodeGenerator( -config: TableConfig, -input1: TypeInformation[Any], -input2: Option[TypeInformation[Any]] = None) + config: TableConfig, + input1: TypeInformation[Any], + input2: Option[TypeInformation[Any]] = None, + inputPojoFieldMapping: Array[Int] = Array()) --- End diff -- Make this parameter optional? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144574#comment-15144574 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52740335 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -231,29 +252,40 @@ class CodeGenerator( * be reused, they will be added to reusable code sections internally. The evaluation result * may be stored in the global result variable (see [[outRecordTerm]]). * -* @param fieldExprs +* @param fieldExprs field expressions to be converted * @param returnType conversion target type. Type must have the same arity than fieldExprs. +* @param resultFieldNames result field names necessary for a mapping to POJO fields. * @return instance of GeneratedExpression */ def generateResultExpression( fieldExprs: Seq[GeneratedExpression], - returnType: TypeInformation[_ <: Any]) + returnType: TypeInformation[_ <: Any], + resultFieldNames: Seq[String]) : GeneratedExpression = { -// TODO disable arity check for Rows and derive row arity from fieldExprs // initial type check if (returnType.getArity != fieldExprs.length) { throw new CodeGenException("Arity of result type does not match number of expressions.") } // type check returnType match { + case pt: PojoTypeInfo[_] => +fieldExprs.zipWithIndex foreach { --- End diff -- Add a check that both `fieldExpr` and `resultFieldName` have the same length. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144401#comment-15144401 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-183268426 Hi @fhueske, I completely share your opinion. This PR does exactly what you described. As you can read in `DataSetSource`: "It ensures that types without deterministic field order (e.g. POJOs) are not part of the plan translation." What I have implemented is proper reading of POJOs (into Table API) and writing of POJOs (out of Table API). Most of the code I have added is to provide codeGen'd converter Map functions for the source and the sink. POJOs are not used for any operatation within operations. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144484#comment-15144484 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52731336 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -75,5 +75,15 @@ class TableEnvironment { TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) } + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataSet. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the --- End diff -- I would make name equivalence only required for POJOs and generic composite types types. Rows and tuples can be matched by position. Otherwise, fields would need to be renamed to `f0`, `f1`, etc. for tuples. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144564#comment-15144564 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52739425 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -42,13 +42,30 @@ import scala.collection.mutable * @param config configuration that determines runtime behavior * @param input1 type information about the first input of the Function * @param input2 type information about the second input if the Function is binary + * @param inputPojoFieldMapping additional mapping information if input1 is a POJO (POJO types + * have no deterministic field order). We assume that input2 is + * converted before and thus is never a POJO. */ class CodeGenerator( -config: TableConfig, -input1: TypeInformation[Any], -input2: Option[TypeInformation[Any]] = None) + config: TableConfig, + input1: TypeInformation[Any], + input2: Option[TypeInformation[Any]] = None, + inputPojoFieldMapping: Array[Int] = Array()) extends RexVisitor[GeneratedExpression] { + /** +* A code generator for generating unary Flink +* [[org.apache.flink.api.common.functions.Function]]s with one input. +* +* @param config configuration that determines runtime behavior +* @param input type information about the input of the Function +* @param inputPojoFieldMapping additional mapping information necessary if input is a +* POJO (POJO types have no deterministic field order). +*/ + def this(config: TableConfig, input: TypeInformation[Any], inputPojoFieldMapping: Array[Int]) = --- End diff -- Make fieldMapping optional? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144611#comment-15144611 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52744266 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -749,4 +867,23 @@ class CodeGenerator( reusableMemberStatements.add(statement) } + def addReusablePrivateFieldAccess(clazz: Class[_], fieldName: String): String = { +val fieldTerm = s"field_${clazz.getCanonicalName.replace('.', '$')}_$fieldName" +val fieldExtraction = + s""" +|java.lang.reflect.Field $fieldTerm = --- End diff -- `Field` is not serializable and should be `transient`. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144622#comment-15144622 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1624#issuecomment-183349741 Thanks @twalthr, the PR looks pretty good. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144584#comment-15144584 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1624#discussion_r52741150 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -150,7 +151,11 @@ object CodeGenUtils { sealed abstract class FieldAccessor - case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor + case class ObjectFieldAccessor(field: Field) extends FieldAccessor + + case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor + + case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor --- End diff -- Nevermind. I was thought about the `PojoSerializer` which always goes through reflection, even for public fields. You are accessing the field directly if it is public which is good. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145227#comment-15145227 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1632 [FLINK-3226] Translate logical joins to physical This PR contains the logical to physical translation for joins. - Joins with equality conditions only do not generate a join function. - Joins with equality and non-equality conditions evaluate the non-equality conditions inside a `FlatJoinFunction`. - Joins without any equality conditions e.g. `in1.join.n2.where(a > b)` are currently not supported. We could consider supporting those later by generating a cross function. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink translateJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1632.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1632 commit f4563cf2ea78a993b00f6398006b1a064ad83857 Author: vasiaDate: 2016-02-11T17:04:45Z [FLINK-3226] Translate logical joins to physical > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142618#comment-15142618 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1600#issuecomment-182817751 Looks pretty good. Can you check the `@Ignore` annotations? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143300#comment-15143300 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1624 [FLINK-3226] Translation from and to POJOs for CodeGenerator This PR implements full POJO support as input and output type of the Table API. It is now possible to convert from an arbitrary type to an other arbitrary type. I fixed the failling AsITCase and implemented additional tests from/to tuples, from/to POJOs and from/to Case classes. @vasia and @fhueske feel free to review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink PojoSupport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1624 commit 2322d9f602c4b7e01a582958f918372efc214848 Author: twalthrDate: 2016-02-11T15:16:29Z [FLINK-3226] Translation from and to POJOs for CodeGenerator > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142754#comment-15142754 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1600#issuecomment-182872698 @fhueske I fixed the aggregation hashCode and enables the ignored tests. Let me know if it's OK now! > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143030#comment-15143030 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1600#issuecomment-182957377 It's all green :D Merging. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143046#comment-15143046 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia closed the pull request at: https://github.com/apache/flink/pull/1600 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140819#comment-15140819 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-182390675 Hey @twalthr, I merged this yesterday but the PR didn't automatically close. Could you please close it manually? Thanks! > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140835#comment-15140835 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1600#issuecomment-182392029 I've rebased this one on top of `tableOnCalcite` which includes #1595. Regarding the average aggregate, I went for @tillrohrmann's suggestion of adding 0.5. That means that an integer average of 1 and 2 will now return 2. Note that in the previous implementation of the Table API this operation would return 1 because integer average was maintaining a pair of sum and count. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140885#comment-15140885 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1595 > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138788#comment-15138788 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52293343 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +object CodeGenUtils { + + private val nameCounter = new AtomicInteger + + def newName(name: String): String = { +s"$name$$${nameCounter.getAndIncrement}" + } + + // when casting we first need to unbox Primitives, for example, + // float a = 1.0f; + // byte b = (byte) a; + // works, but for boxed types we need this: + // Float a = 1.0f; + // Byte b = (byte)(float) a; + def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "int" +case LONG_TYPE_INFO => "long" +case SHORT_TYPE_INFO => "short" +case BYTE_TYPE_INFO => "byte" +case FLOAT_TYPE_INFO => "float" +case DOUBLE_TYPE_INFO => "double" +case BOOLEAN_TYPE_INFO => "boolean" +case CHAR_TYPE_INFO => "char" + +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "-1" +case LONG_TYPE_INFO => "-1" +case SHORT_TYPE_INFO => "-1" +case BYTE_TYPE_INFO => "-1" +case FLOAT_TYPE_INFO => "-1.0f" +case DOUBLE_TYPE_INFO => "-1.0d" +case BOOLEAN_TYPE_INFO => "false" +case STRING_TYPE_INFO => "\"\"" +case CHAR_TYPE_INFO => "'\\0'" +case _ => "null" + } + + def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match { +case nti:
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138784#comment-15138784 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52293004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.Indenter._ --- End diff -- Intellij tells me this import is unused > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138805#comment-15138805 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181824403 Hi Timo, the PR looks really good :-) I found the following issues / questions: - Accessing of POJO fields might not work. - Can you add method comments to the code generation methods in `CodeGenerator` and `CodeGenUtils`? - Would it make sense to separate the function and expression code gen, i.e., split the `CodeGenerator` class? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138705#comment-15138705 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1600#issuecomment-181794348 Thanks for the feedback @tillrohrmann, @twalthr! I've moved the classes to `org.apache.flink.api.table.runtime` and tried to shorten the aggregates code using Numerics. I only left `AvgAggregate` as is, because integer average and float/double average are computed differently. We can always replace it with code generation later as @twalthr suggested. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138764#comment-15138764 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52291201 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.Indenter._ +import org.apache.flink.api.table.codegen.OperatorCodeGen._ +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +class CodeGenerator( +config: TableConfig, +input1: TypeInformation[Any], +input2: Option[TypeInformation[Any]] = None) + extends RexVisitor[GeneratedExpression] { + + // set of member statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableMemberStatements = mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableInitStatements = mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]() + + def reuseMemberCode(): String = { +reusableMemberStatements.mkString("", "\n", "\n") + } + + def reuseInitCode(): String = { +reusableInitStatements.mkString("", "\n", "\n") + } + + def reuseInputUnboxingCode(): String = { +reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n") + } + + def input1Term = "in1" + + def input2Term = "in2" + + def collectorTerm = "c" + + def outRecordTerm = "out" + + def nullCheck: Boolean = config.getNullCheck + + def generateExpression(rex: RexNode): GeneratedExpression = { +rex.accept(this) + } + + def generateFunction[T <: Function]( + name: String, + clazz: Class[T], + bodyCode: String, + returnType: TypeInformation[Any]) +: GeneratedFunction[T] = { +val funcName = newName(name) + +// Janino does not support generics, that's why we need +// manual casting here +val samHeader = + if (clazz == classOf[FlatMapFunction[_,_]]) { +val inputTypeTerm = boxedTypeTermForTypeInfo(input1) +(s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else if (clazz == classOf[MapFunction[_,_]]) { +val inputTypeTerm = boxedTypeTermForTypeInfo(input1) +("Object map(Object _in1)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else { +// TODO more functions +throw new
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139025#comment-15139025 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181902296 I have added comments and fixed the string bug. I will solve the other issues at the end of the week. If you have no objections, you can merge the PR ;-) > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139052#comment-15139052 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52320399 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Byte] { + private var sum: Long = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { +sum = 0 +count = 0 + } + + override def aggregate(value: Any): Unit = { +count += 1 +sum += value.asInstanceOf[Byte] --- End diff -- I know. However, aggregating on a `Double` and then calling `toInt` before returning the result led to rounding errors. e.g. there was a test where the sum was 231 of 21 records, i.e. the avg should be 11, but it was actually computed as 10.9 and rounded down to 10. If you have any idea how to avoid such cases, please let me know! > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138876#comment-15138876 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52301458 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.Indenter._ +import org.apache.flink.api.table.codegen.OperatorCodeGen._ +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +class CodeGenerator( +config: TableConfig, +input1: TypeInformation[Any], +input2: Option[TypeInformation[Any]] = None) + extends RexVisitor[GeneratedExpression] { + + // set of member statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableMemberStatements = mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableInitStatements = mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]() + + def reuseMemberCode(): String = { +reusableMemberStatements.mkString("", "\n", "\n") + } + + def reuseInitCode(): String = { +reusableInitStatements.mkString("", "\n", "\n") + } + + def reuseInputUnboxingCode(): String = { +reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n") + } + + def input1Term = "in1" + + def input2Term = "in2" + + def collectorTerm = "c" + + def outRecordTerm = "out" + + def nullCheck: Boolean = config.getNullCheck + + def generateExpression(rex: RexNode): GeneratedExpression = { +rex.accept(this) + } + + def generateFunction[T <: Function]( + name: String, + clazz: Class[T], + bodyCode: String, + returnType: TypeInformation[Any]) +: GeneratedFunction[T] = { +val funcName = newName(name) + +// Janino does not support generics, that's why we need +// manual casting here +val samHeader = + if (clazz == classOf[FlatMapFunction[_,_]]) { +val inputTypeTerm = boxedTypeTermForTypeInfo(input1) +(s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else if (clazz == classOf[MapFunction[_,_]]) { +val inputTypeTerm = boxedTypeTermForTypeInfo(input1) +("Object map(Object _in1)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else { +// TODO more functions +throw new
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138913#comment-15138913 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52304432 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala --- @@ -58,19 +58,23 @@ class FlinkAggregate( ) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { - -val origCosts = super.computeSelfCost(planner) -val deltaCost = planner.getCostFactory.makeHugeCost() - -// only prefer aggregations with transformed Avg -aggCalls.toList.foldLeft[RelOptCost](origCosts){ - (c: RelOptCost, a: AggregateCall) => -if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { - c.plus(deltaCost) -} else { - c -} -} - } +// +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS +// ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED. +// +// override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { +// +//val origCosts = super.computeSelfCost(planner) +//val deltaCost = planner.getCostFactory.makeHugeCost() +// +//// only prefer aggregations with transformed Avg +//aggCalls.toList.foldLeft[RelOptCost](origCosts){ +// (c: RelOptCost, a: AggregateCall) => +//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { +// c.plus(deltaCost) +//} else { +// c +//} +//} +// } --- End diff -- It's fine to drop the code. We can improve the aggregations later. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138880#comment-15138880 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52301613 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.Indenter._ --- End diff -- I had the same problem with IntelliJ. But if you remove the import, the `j"..."` will not work anymore. Actually I think we don't need the Indenter at all. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138904#comment-15138904 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52303126 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +object CodeGenUtils { + + private val nameCounter = new AtomicInteger + + def newName(name: String): String = { +s"$name$$${nameCounter.getAndIncrement}" + } + + // when casting we first need to unbox Primitives, for example, + // float a = 1.0f; + // byte b = (byte) a; + // works, but for boxed types we need this: + // Float a = 1.0f; + // Byte b = (byte)(float) a; + def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "int" +case LONG_TYPE_INFO => "long" +case SHORT_TYPE_INFO => "short" +case BYTE_TYPE_INFO => "byte" +case FLOAT_TYPE_INFO => "float" +case DOUBLE_TYPE_INFO => "double" +case BOOLEAN_TYPE_INFO => "boolean" +case CHAR_TYPE_INFO => "char" + +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "-1" +case LONG_TYPE_INFO => "-1" +case SHORT_TYPE_INFO => "-1" +case BYTE_TYPE_INFO => "-1" +case FLOAT_TYPE_INFO => "-1.0f" +case DOUBLE_TYPE_INFO => "-1.0d" +case BOOLEAN_TYPE_INFO => "false" +case STRING_TYPE_INFO => "\"\"" +case CHAR_TYPE_INFO => "'\\0'" +case _ => "null" + } + + def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match { +case nti:
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138916#comment-15138916 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52304994 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala --- @@ -37,14 +39,24 @@ class DataSetAggregateRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) -new DataSetReduce( +val grouping = agg.getGroupSet.asList().map { --- End diff -- I think you can use `ImmutableBitSet.toArray` to directly generate an int[]. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138936#comment-15138936 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52306136 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class MaxAggregate[T: Numeric] extends Aggregate[T] { + + var result: T = _ + + override def aggregate(value: Any): Unit = { +val input: T = value.asInstanceOf[T] +val numericResult = implicitly[Numeric[T]] --- End diff -- Move out of `aggregate()`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138937#comment-15138937 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52306203 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import scala.reflect.runtime.universe._ + +abstract class MinAggregate[T: Numeric] extends Aggregate[T] { + + var result: T = _ + + override def aggregate(value: Any): Unit = { +val input: T = value.asInstanceOf[T] +val numericResult = implicitly[Numeric[T]] + +result = numericResult.min(result, input) + } + + override def getAggregated(): T = { +result + } + +} + +// Numeric doesn't have max value +class TinyMinAggregate extends MinAggregate[Byte] { + +override def initiateAggregate: Unit = { --- End diff -- indention (also in the following classes). > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138885#comment-15138885 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181848000 Thanks for reviewing @fhueske. I will comment my code more. Regarding the splitting of the `CodeGenerator`, I think it is not that easy because functions and expressions both access reusable code parts. I will think about it again... > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138934#comment-15138934 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52306088 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Byte] { + private var sum: Long = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { +sum = 0 +count = 0 + } + + override def aggregate(value: Any): Unit = { +count += 1 +sum += value.asInstanceOf[Byte] --- End diff -- the `aggregate` method was previously implemented as `avgValue += (current - avgValue) / count` to avoid overflow on `sum`. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138931#comment-15138931 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52305988 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import scala.reflect.runtime.universe._ + +abstract class MinAggregate[T: Numeric] extends Aggregate[T] { + + var result: T = _ + + override def aggregate(value: Any): Unit = { +val input: T = value.asInstanceOf[T] +val numericResult = implicitly[Numeric[T]] --- End diff -- Can this be moved out of the `aggregate` method? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138881#comment-15138881 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52301780 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +object CodeGenUtils { + + private val nameCounter = new AtomicInteger + + def newName(name: String): String = { +s"$name$$${nameCounter.getAndIncrement}" + } + + // when casting we first need to unbox Primitives, for example, + // float a = 1.0f; + // byte b = (byte) a; + // works, but for boxed types we need this: + // Float a = 1.0f; + // Byte b = (byte)(float) a; + def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "int" +case LONG_TYPE_INFO => "long" +case SHORT_TYPE_INFO => "short" +case BYTE_TYPE_INFO => "byte" +case FLOAT_TYPE_INFO => "float" +case DOUBLE_TYPE_INFO => "double" +case BOOLEAN_TYPE_INFO => "boolean" +case CHAR_TYPE_INFO => "char" + +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { +// From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections +// does not seem to like this, so we manually give the correct type here. +case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" +case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" +case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" +case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" +case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" +case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" +case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" +case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + +case _ => + tpe.getTypeClass.getCanonicalName + } + + def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match { +case INT_TYPE_INFO => "-1" +case LONG_TYPE_INFO => "-1" +case SHORT_TYPE_INFO => "-1" +case BYTE_TYPE_INFO => "-1" +case FLOAT_TYPE_INFO => "-1.0f" +case DOUBLE_TYPE_INFO => "-1.0d" +case BOOLEAN_TYPE_INFO => "false" +case STRING_TYPE_INFO => "\"\"" +case CHAR_TYPE_INFO => "'\\0'" +case _ => "null" + } + + def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match { +case nti:
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138922#comment-15138922 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52305236 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala --- @@ -39,6 +46,10 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) +val joinKeys = getJoinKeys(join) --- End diff -- I would exclude the changes in this file from the PR. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139202#comment-15139202 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181955939 Thanks for the quick update @twalthr! Some tests are failing because the wrong type of exception is expected. I'll fix those and then merge this. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139332#comment-15139332 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52349128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Byte] { + private var sum: Long = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { +sum = 0 +count = 0 + } + + override def aggregate(value: Any): Unit = { +count += 1 +sum += value.asInstanceOf[Byte] --- End diff -- What about simply adding `0.5`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137174#comment-15137174 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52190250 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class SumAggregate[T] extends Aggregate[T]{ + +} + +// TinyInt sum aggregate return Int as aggregated value. +class TinyIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Byte] + } +} + +// SmallInt sum aggregate return Int as aggregated value. +class SmallIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Short] + } +} + +// Int sum aggregate return Int as aggregated value. +class IntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Int] + } +} + +// Long sum aggregate return Long as aggregated value. +class LongSumAggregate extends SumAggregate[Long] { + + private var sumValue: Long = 0L + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Long] + } + + override def getAggregated(): Long = { +sumValue + } +} + +// Float sum aggregate return Float as aggregated value. +class FloatSumAggregate extends SumAggregate[Float] { + private var sumValue: Float = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Float] + } + + override def getAggregated(): Float = { +sumValue + } +} + +// Double sum aggregate return Double as aggregated value. +class DoubleSumAggregate extends SumAggregate[Double] { + private var sumValue: Double = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Double] + } + + override def getAggregated(): Double = { +sumValue + } +} --- End diff -- We could also replace it later using code generation. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 >
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137205#comment-15137205 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52192508 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala --- @@ -58,19 +58,23 @@ class FlinkAggregate( ) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { - -val origCosts = super.computeSelfCost(planner) -val deltaCost = planner.getCostFactory.makeHugeCost() - -// only prefer aggregations with transformed Avg -aggCalls.toList.foldLeft[RelOptCost](origCosts){ - (c: RelOptCost, a: AggregateCall) => -if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { - c.plus(deltaCost) -} else { - c -} -} - } +// +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS +// ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED. +// +// override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { +// +//val origCosts = super.computeSelfCost(planner) +//val deltaCost = planner.getCostFactory.makeHugeCost() +// +//// only prefer aggregations with transformed Avg +//aggCalls.toList.foldLeft[RelOptCost](origCosts){ +// (c: RelOptCost, a: AggregateCall) => +//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { +// c.plus(deltaCost) +//} else { +// c +//} +//} +// } --- End diff -- Yes, we decided to disable `AggregateReduceFunctionsRule` temporarily, thus the note on top. I could remove it completely if you think it'd be better. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137200#comment-15137200 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52192307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class MaxAggregate[T] extends Aggregate[T]{ + +} + +class TinyIntMaxAggregate extends MaxAggregate[Byte] { + private var max = Byte.MinValue + + override def initiateAggregate: Unit = { +max = Byte.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Byte] +if (current > max) { + max = current +} + } + + override def getAggregated(): Byte = { +max + } +} + +class SmallIntMaxAggregate extends MaxAggregate[Short] { + private var max = Short.MinValue + + override def initiateAggregate: Unit = { +max = Short.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Short] +if (current > max) { + max = current +} + } + + override def getAggregated(): Short = { +max + } +} + +class IntMaxAggregate extends MaxAggregate[Int] { + private var max = Int.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Int] +if (current > max) { + max = current +} + } + + override def getAggregated(): Int = { +max + } +} + +class LongMaxAggregate extends MaxAggregate[Long] { + private var max = Long.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Long] +if (current > max) { + max = current +} + } + + override def getAggregated(): Long = { +max + } +} + +class FloatMaxAggregate extends MaxAggregate[Float] { + private var max = Float.MinValue --- End diff -- I missed to correct this, thnx! > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints,
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137210#comment-15137210 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52192844 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions + +import java.lang.Iterable +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import scala.collection.JavaConversions._ +import org.apache.flink.api.table.Row + +/** + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input, + * feed to the aggregates, and collect the record with aggregated value. + * + * @param aggregates SQL aggregate functions. + * @param fields The grouped keys' indices in the input. + * @param groupingKeys The grouping keys' positions. + */ +class AggregateFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val fields: Array[Int], +private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] { --- End diff -- Sounds reasonable. Together with the expression functions or separately, e.g. in `org.apache.flink.table.runtime.functions`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137003#comment-15137003 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52175496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class MaxAggregate[T] extends Aggregate[T]{ + +} + +class TinyIntMaxAggregate extends MaxAggregate[Byte] { + private var max = Byte.MinValue + + override def initiateAggregate: Unit = { +max = Byte.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Byte] +if (current > max) { + max = current +} + } + + override def getAggregated(): Byte = { +max + } +} + +class SmallIntMaxAggregate extends MaxAggregate[Short] { + private var max = Short.MinValue + + override def initiateAggregate: Unit = { +max = Short.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Short] +if (current > max) { + max = current +} + } + + override def getAggregated(): Short = { +max + } +} + +class IntMaxAggregate extends MaxAggregate[Int] { + private var max = Int.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Int] +if (current > max) { + max = current +} + } + + override def getAggregated(): Int = { +max + } +} + +class LongMaxAggregate extends MaxAggregate[Long] { + private var max = Long.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Long] +if (current > max) { + max = current +} + } + + override def getAggregated(): Long = { +max + } +} + +class FloatMaxAggregate extends MaxAggregate[Float] { + private var max = Float.MinValue --- End diff -- Why `Float.MinValue` here and in `initiateAggregate` `Int.MinValue`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137006#comment-15137006 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52175572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class MaxAggregate[T] extends Aggregate[T]{ + +} + +class TinyIntMaxAggregate extends MaxAggregate[Byte] { + private var max = Byte.MinValue + + override def initiateAggregate: Unit = { +max = Byte.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Byte] +if (current > max) { + max = current +} + } + + override def getAggregated(): Byte = { +max + } +} + +class SmallIntMaxAggregate extends MaxAggregate[Short] { + private var max = Short.MinValue + + override def initiateAggregate: Unit = { +max = Short.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Short] +if (current > max) { + max = current +} + } + + override def getAggregated(): Short = { +max + } +} + +class IntMaxAggregate extends MaxAggregate[Int] { + private var max = Int.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Int] +if (current > max) { + max = current +} + } + + override def getAggregated(): Int = { +max + } +} + +class LongMaxAggregate extends MaxAggregate[Long] { + private var max = Long.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Long] +if (current > max) { + max = current +} + } + + override def getAggregated(): Long = { +max + } +} + +class FloatMaxAggregate extends MaxAggregate[Float] { + private var max = Float.MinValue + + override def initiateAggregate: Unit = { +max = Int.MinValue + } + + override def aggregate(value: Any): Unit = { +val current = value.asInstanceOf[Float] +if (current > max) { + max = current +} + } + + override def getAggregated(): Float = { +max + } +} + +class DoubleMaxAggregate extends MaxAggregate[Double] { + private var max = Double.MinValue + + override def initiateAggregate: Unit = { +max = Int.MaxValue --- End diff -- Why `Int.MaxValue` here? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137018#comment-15137018 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52176834 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions + +import java.lang.Iterable +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import scala.collection.JavaConversions._ +import org.apache.flink.api.table.Row + +/** + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input, + * feed to the aggregates, and collect the record with aggregated value. + * + * @param aggregates SQL aggregate functions. + * @param fields The grouped keys' indices in the input. + * @param groupingKeys The grouping keys' positions. + */ +class AggregateFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val fields: Array[Int], +private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] { --- End diff -- I would move all runtime related function to `org.apache.flink.table.runtime`. IMO `plan` is not the right place for `functions`. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136976#comment-15136976 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1600 [FLINK-3226] Translate logical aggregations to physical This PR builds on #1567 and addresses @fhueske's comments on translating aggregations. Join translation is not part of this PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink LogicalToPhysical Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1600.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1600 commit 6676aab520bd648c360f70a2047196d004ce1d31 Author: chengxiang liDate: 2016-02-01T07:18:14Z [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels. commit cf41b740d32768185ec692e93754056ff6a16b59 Author: vasia Date: 2016-02-04T14:53:52Z [FLINK-3226] implement GroupReduce translation; enable tests for supported operations - compute average as sum and count for byte, short and int type to avoid rounding errors > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137011#comment-15137011 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52175960 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala --- @@ -58,19 +58,23 @@ class FlinkAggregate( ) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { - -val origCosts = super.computeSelfCost(planner) -val deltaCost = planner.getCostFactory.makeHugeCost() - -// only prefer aggregations with transformed Avg -aggCalls.toList.foldLeft[RelOptCost](origCosts){ - (c: RelOptCost, a: AggregateCall) => -if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { - c.plus(deltaCost) -} else { - c -} -} - } +// +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS +// ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED. +// +// override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { +// +//val origCosts = super.computeSelfCost(planner) +//val deltaCost = planner.getCostFactory.makeHugeCost() +// +//// only prefer aggregations with transformed Avg +//aggCalls.toList.foldLeft[RelOptCost](origCosts){ +// (c: RelOptCost, a: AggregateCall) => +//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { +// c.plus(deltaCost) +//} else { +// c +//} +//} +// } --- End diff -- Commented code? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137065#comment-15137065 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52181382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class SumAggregate[T] extends Aggregate[T]{ + +} + +// TinyInt sum aggregate return Int as aggregated value. +class TinyIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Byte] + } +} + +// SmallInt sum aggregate return Int as aggregated value. +class SmallIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Short] + } +} + +// Int sum aggregate return Int as aggregated value. +class IntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Int] + } +} + +// Long sum aggregate return Long as aggregated value. +class LongSumAggregate extends SumAggregate[Long] { + + private var sumValue: Long = 0L + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Long] + } + + override def getAggregated(): Long = { +sumValue + } +} + +// Float sum aggregate return Float as aggregated value. +class FloatSumAggregate extends SumAggregate[Float] { + private var sumValue: Float = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Float] + } + + override def getAggregated(): Float = { +sumValue + } +} + +// Double sum aggregate return Double as aggregated value. +class DoubleSumAggregate extends SumAggregate[Double] { + private var sumValue: Double = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Double] + } + + override def getAggregated(): Double = { +sumValue + } +} --- End diff -- This is a lot of duplicated code. Could we make it a bit less redundant by doing something similar to? ``` class SAggregate[I: spire.math.Numeric, T: spire.math.Numeric] extends Aggregate[T] { var result: T = _ /** * Initialize the aggregate state. */ override def initiateAggregate: Unit = { result = implicitly[Numeric[T]].zero } /** * Feed the aggregate field value. * * @param value */ override def aggregate(value: Any): Unit = { val input: I = value.asInstanceOf[I] val numericInput = implicitly[spire.math.Numeric[I]] val
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137691#comment-15137691 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52225586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class SumAggregate[T] extends Aggregate[T]{ + +} + +// TinyInt sum aggregate return Int as aggregated value. +class TinyIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Byte] + } +} + +// SmallInt sum aggregate return Int as aggregated value. +class SmallIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Short] + } +} + +// Int sum aggregate return Int as aggregated value. +class IntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Int] + } +} + +// Long sum aggregate return Long as aggregated value. +class LongSumAggregate extends SumAggregate[Long] { + + private var sumValue: Long = 0L + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Long] + } + + override def getAggregated(): Long = { +sumValue + } +} + +// Float sum aggregate return Float as aggregated value. +class FloatSumAggregate extends SumAggregate[Float] { + private var sumValue: Float = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Float] + } + + override def getAggregated(): Float = { +sumValue + } +} + +// Double sum aggregate return Double as aggregated value. +class DoubleSumAggregate extends SumAggregate[Double] { + private var sumValue: Double = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Double] + } + + override def getAggregated(): Double = { +sumValue + } +} --- End diff -- No objections. I was just thinking about future improvements. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137685#comment-15137685 ] ASF GitHub Bot commented on FLINK-3226: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52225277 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions + +import java.lang.Iterable +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import scala.collection.JavaConversions._ +import org.apache.flink.api.table.Row + +/** + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input, + * feed to the aggregates, and collect the record with aggregated value. + * + * @param aggregates SQL aggregate functions. + * @param fields The grouped keys' indices in the input. + * @param groupingKeys The grouping keys' positions. + */ +class AggregateFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val fields: Array[Int], +private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] { --- End diff -- I would move everything there that is needed during runtime. `AggregateFunction` in `org.apache.flink.table.runtiime` and helper classes in sub-packages. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137409#comment-15137409 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52204752 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class SumAggregate[T] extends Aggregate[T]{ + +} + +// TinyInt sum aggregate return Int as aggregated value. +class TinyIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Byte] + } +} + +// SmallInt sum aggregate return Int as aggregated value. +class SmallIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Short] + } +} + +// Int sum aggregate return Int as aggregated value. +class IntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + + override def getAggregated(): Int = { +sumValue + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Int] + } +} + +// Long sum aggregate return Long as aggregated value. +class LongSumAggregate extends SumAggregate[Long] { + + private var sumValue: Long = 0L + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Long] + } + + override def getAggregated(): Long = { +sumValue + } +} + +// Float sum aggregate return Float as aggregated value. +class FloatSumAggregate extends SumAggregate[Float] { + private var sumValue: Float = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Float] + } + + override def getAggregated(): Float = { +sumValue + } +} + +// Double sum aggregate return Double as aggregated value. +class DoubleSumAggregate extends SumAggregate[Double] { + private var sumValue: Double = 0 + + override def initiateAggregate: Unit = { +sumValue = 0 + } + + override def aggregate(value: Any): Unit = { +sumValue += value.asInstanceOf[Double] + } + + override def getAggregated(): Double = { +sumValue + } +} --- End diff -- I like @tillrohrmann's suggestion. Any reason why not use Scala's `Numeric` though? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134451#comment-15134451 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1595 [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs This PR implements a CodeGenerator for Table API on Calcite. It includes the following: - FilterITCase, SelectITCase and ExpressionITCase are mostly working again - Null values are now fully supported and tested with new `TableProgramsTestBase` - Arithmetic `+-*/ +X -X`, logical operators `== != AND, OR, NOT, IS NULL, NOT NULL` are supported. - Logical operators implement 3-valued logic. - A new configuration parameter `efficientTypeUsage` allows generated DataSet programs to be as efficient as ordinary DataSet programs by avoiding unnecessary type conversions and using most efficient type for every operator. Limitations: - String functions are not yet supported. They will be forwarded to Calcites runtime functions once I have implemented the necessary interfaces for that. - Expression type casting is missing yet - Date Literal is not supported yet You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink CodeGen Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1595.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1595 commit c1a63287de0014f0b5bd23d1e0bc31e679f02fbe Author: twalthrDate: 2016-02-05T16:40:54Z [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132156#comment-15132156 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1579#issuecomment-179777395 I'll merge this PR to the `tableOnCalcite` branch. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15129042#comment-15129042 ] ASF GitHub Bot commented on FLINK-3226: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1579 [FLINK-3226] Add DataSet scan and conversion to DataSet[Row] You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink dataSetTrans Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1579 commit c0073a13fcbbe4a730c2d967561838a28d574c2d Author: Fabian HueskeDate: 2016-02-02T16:15:28Z [FLINK-3226] Add DataSet scan and conversion to DataSet[Row] > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119050#comment-15119050 ] Chengxiang Li commented on FLINK-3226: -- [~fhueske] and [~twalthr] , this task should be blocked by me for a while, sorry about that. The task seems more complicated and has more work than i expected, it contains the physical plan genration part of Timo's prototype + the whole previous Table API translation part actually, to move this forward faster, as Fabian suggested, i agree that we can split it into multi sub tasks, such as expression code generation, and even by the Calcite RelNodes, for example, we can work on RelNodes with no dependency in parallel, such as Sort, Join and Aggregate, etc. i would make this task very small, which only contains the Project translator without expression code generation and make it ready during this week. Besides, while try to test my code, i found the {{PlannerImpl}} check its state during each step of query planning process, and there is no way to set it manually, for Table API, we actually skip several steps, such as parse, validate, it failed while we directly go to the optimize step due to the unmatched state, do you guys have any idea about this? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)