[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167678#comment-15167678
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1709


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167662#comment-15167662
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1709#issuecomment-188933506
  
You can close this now @twalthr. I've merged it.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167115#comment-15167115
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1679


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166930#comment-15166930
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1709

[FLINK-3226] Improvements for expected types

I added some documentation and fixed some bugs when performing aggregations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink ExpectedTypeImprovements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1709.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1709


commit a8614d81d81e45c7a67b2d90018923980881dcb8
Author: twalthr 
Date:   2016-02-25T08:36:32Z

[FLINK-3226] Improvements for expected types




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166925#comment-15166925
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1679#issuecomment-188669681
  
Thanks for the update. Looks good to merge


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15166872#comment-15166872
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1679#issuecomment-188655499
  
If there are no objections, I would like to merge this later.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162879#comment-15162879
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1679#issuecomment-188219011
  
@fhueske build succeeded


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162692#comment-15162692
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53910352
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 ---
@@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) {
super(mode);
}
 
-   @Test(expected = CodeGenException.class)
--- End diff --

Yes, you are right, testing at least one function end-to-end makes sense. I 
will add them again.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160680#comment-15160680
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53906027
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 ---
@@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) {
super(mode);
}
 
-   @Test(expected = CodeGenException.class)
--- End diff --

Agreed, we should not have end-to-end tests for all scalar functions. But 
it would be good to tests at least some representative functions, IMO.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160358#comment-15160358
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53905333
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.test.utils.ExpressionEvaluator
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ScalarFunctionsTest {
+
+  @Test
+  def testSubstring(): Unit = {
+testFunction(
+  'f0.substring(2),
+  "f0.substring(2)",
+  "SUBSTRING(f0, 2)",
+  "his is a test String.")
+
+testFunction(
+  'f0.substring(2, 5),
+  "f0.substring(2, 5)",
+  "SUBSTRING(f0, 2, 5)",
+  "his i")
+
+testFunction(
+  'f0.substring(1, 'f7),
+  "f0.substring(1, f7)",
+  "SUBSTRING(f0, 1, f7)",
+  "Thi")
+  }
+
+  // 
--
+
+  def testFunction(
+  expr: Expression,
+  exprString: String,
+  sqlExpr: String,
+  expected: String): Unit = {
+val testData = new Row(8)
+testData.setField(0, "This is a test String.")
+testData.setField(1, true)
+testData.setField(2, 42.toByte)
+testData.setField(3, 43.toShort)
+testData.setField(4, 44.toLong)
+testData.setField(5, 4.5.toFloat)
+testData.setField(6, 4.6)
+testData.setField(7, 3)
+
+val typeInfo = new RowTypeInfo(Seq(
+  STRING_TYPE_INFO,
+  BOOLEAN_TYPE_INFO,
+  BYTE_TYPE_INFO,
+  SHORT_TYPE_INFO,
+  LONG_TYPE_INFO,
+  FLOAT_TYPE_INFO,
+  DOUBLE_TYPE_INFO,
+  INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+
+val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
+assertEquals(expected, exprResult)
+
+val exprStringResult = ExpressionEvaluator.evaluate(
+  testData,
+  typeInfo,
+  ExpressionParser.parseExpression(exprString))
+assertEquals(expected, exprStringResult)
+
+// TODO test SQL expression
--- End diff --

Once we have a SQL parser ready, I will resolve this TODO ;-)


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160353#comment-15160353
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53905238
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 ---
@@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) {
super(mode);
}
 
-   @Test(expected = CodeGenException.class)
--- End diff --

No, the `ScalarFunctionsTest` does not test end-to-end. I don't think this 
is necessary for scalar functions, because they don't affect optimization/plan 
translation. When we will have more functions (e.g. 40 of them), we would have 
40 end-to-end tests just implementing a MapFunction and evaluating an 
expression. That's why I implemented the `ExpressionEvaluator`.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158711#comment-15158711
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1679#issuecomment-187648227
  
I had only a few minor comments. Looks mostly good.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158704#comment-15158704
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53763441
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 ---
@@ -18,42 +18,20 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
 import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+
 import scala.collection.JavaConverters._
-import org.apache.flink.api.table.codegen.CodeGenException
 
 @RunWith(classOf[Parameterized])
 class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[CodeGenException])
--- End diff --

Why did you remove these tests? The `ScalarFunctionsTest` does not test the 
feature end-to-end, right?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158705#comment-15158705
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53763449
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 ---
@@ -40,46 +40,6 @@ public StringExpressionsITCase(TestExecutionMode mode) {
super(mode);
}
 
-   @Test(expected = CodeGenException.class)
--- End diff --

Why did you remove these tests? The `ScalarFunctionsTest` does not test the 
feature end-to-end, right?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158695#comment-15158695
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53763082
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.test.utils.ExpressionEvaluator
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ScalarFunctionsTest {
+
+  @Test
+  def testSubstring(): Unit = {
+testFunction(
+  'f0.substring(2),
+  "f0.substring(2)",
+  "SUBSTRING(f0, 2)",
+  "his is a test String.")
+
+testFunction(
+  'f0.substring(2, 5),
+  "f0.substring(2, 5)",
+  "SUBSTRING(f0, 2, 5)",
+  "his i")
+
+testFunction(
+  'f0.substring(1, 'f7),
+  "f0.substring(1, f7)",
+  "SUBSTRING(f0, 1, f7)",
+  "Thi")
+  }
+
+  // 
--
+
+  def testFunction(
+  expr: Expression,
+  exprString: String,
+  sqlExpr: String,
+  expected: String): Unit = {
+val testData = new Row(8)
+testData.setField(0, "This is a test String.")
+testData.setField(1, true)
+testData.setField(2, 42.toByte)
+testData.setField(3, 43.toShort)
+testData.setField(4, 44.toLong)
+testData.setField(5, 4.5.toFloat)
+testData.setField(6, 4.6)
+testData.setField(7, 3)
+
+val typeInfo = new RowTypeInfo(Seq(
+  STRING_TYPE_INFO,
+  BOOLEAN_TYPE_INFO,
+  BYTE_TYPE_INFO,
+  SHORT_TYPE_INFO,
+  LONG_TYPE_INFO,
+  FLOAT_TYPE_INFO,
+  DOUBLE_TYPE_INFO,
+  INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+
+val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
+assertEquals(expected, exprResult)
+
+val exprStringResult = ExpressionEvaluator.evaluate(
+  testData,
+  typeInfo,
+  ExpressionParser.parseExpression(exprString))
+assertEquals(expected, exprStringResult)
+
+// TODO test SQL expression
--- End diff --

Resolve TODO?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158570#comment-15158570
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1679#discussion_r53751120
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -702,12 +704,17 @@ class CodeGenerator(
 requireBoolean(operand)
 generateNot(nullCheck, operand)
 
+  // casting
   case CAST =>
 val operand = operands.head
 generateCast(nullCheck, operand, resultType)
 
-  case call@_ =>
--- End diff --

Add the all case to the end to get a good error message.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155749#comment-15155749
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1679

[FLINK-3226] Translation of scalar function substring()

This PR implements the scalar function `substring()` for the Table API on 
Calcite. Additionally, it already contains preparations for more built-in SQL 
functions. I implemented test utils for scalar functions `ScalarFunctionsTest` 
and added a similar concept to Calcites `CallImplementor` named `CallGenerator`.

For now, I kept the `Substring` expression node, but I would like to remove 
it and use a generic `Call` expression. I think we don't need a case class for 
every operator. What do you think?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink AdvancedOperators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1679


commit 0106e4723f38ab533bf15c909b98299e27530b2a
Author: twalthr 
Date:   2016-02-20T20:41:44Z

[FLINK-3226] Translation of scalar function substring()




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149121#comment-15149121
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1639


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149123#comment-15149123
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1634


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148547#comment-15148547
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1634#issuecomment-184672115
  
Merged, please close @twalthr 


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148282#comment-15148282
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1634#issuecomment-184581189
  
Thanks for the update. LGTM, merging this


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148281#comment-15148281
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1639#issuecomment-184581139
  
Merging this


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147831#comment-15147831
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1634#issuecomment-184412261
  
@fhueske you were totally right. I reworked the casting again.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147650#comment-15147650
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1639#issuecomment-184326419
  
Looks good. :-) 
+1 to merge.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147622#comment-15147622
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1634#discussion_r52925323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 ---
@@ -18,20 +18,56 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 
 object OperatorCodeGen {
 
-   def generateArithmeticOperator(
+  def generateArithmeticOperator(
   operator: String,
   nullCheck: Boolean,
   resultType: TypeInformation[_],
   left: GeneratedExpression,
   right: GeneratedExpression)
 : GeneratedExpression = {
-generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+// String arithmetic // TODO rework
+if (isString(left)) {
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
   (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+  }
+}
+// Numeric arithmetic
+else if (isNumeric(left) && isNumeric(right)) {
+  val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+  val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+  (leftTerm, rightTerm) =>
+// insert auto casting for "narrowing primitive conversions"
+if (leftType != rightType) {
+  // leftType can not be casted to rightType automatically -> 
narrow
+  if (!leftType.shouldAutocastTo(rightType)) {
--- End diff --

Just checked, double is actually downcasted.
The following test fails:
```
val t = env.fromElements((10.0d: Double, 1: Byte)).toTable.select('_1 + 
'_2)
val expected = "11.0"
```


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147608#comment-15147608
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1634#discussion_r52924026
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 ---
@@ -18,20 +18,56 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 
 object OperatorCodeGen {
 
-   def generateArithmeticOperator(
+  def generateArithmeticOperator(
   operator: String,
   nullCheck: Boolean,
   resultType: TypeInformation[_],
   left: GeneratedExpression,
   right: GeneratedExpression)
 : GeneratedExpression = {
-generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+// String arithmetic // TODO rework
+if (isString(left)) {
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
   (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+  }
+}
+// Numeric arithmetic
+else if (isNumeric(left) && isNumeric(right)) {
+  val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+  val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+  (leftTerm, rightTerm) =>
+// insert auto casting for "narrowing primitive conversions"
+if (leftType != rightType) {
+  // leftType can not be casted to rightType automatically -> 
narrow
+  if (!leftType.shouldAutocastTo(rightType)) {
--- End diff --

Wouldn't this downcast `double` to `short` given an expression: `double + 
short`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147585#comment-15147585
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1634#discussion_r52921997
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 ---
@@ -18,20 +18,56 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 
 object OperatorCodeGen {
 
-   def generateArithmeticOperator(
+  def generateArithmeticOperator(
   operator: String,
   nullCheck: Boolean,
   resultType: TypeInformation[_],
   left: GeneratedExpression,
   right: GeneratedExpression)
 : GeneratedExpression = {
-generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+// String arithmetic // TODO rework
+if (isString(left)) {
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
   (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+  }
+}
+// Numeric arithmetic
+else if (isNumeric(left) && isNumeric(right)) {
+  val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+  val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+
+  generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+  (leftTerm, rightTerm) =>
+// insert auto casting for "narrowing primitive conversions"
+if (leftType != rightType) {
--- End diff --

Shouldn't we compare and cast to the result type?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147548#comment-15147548
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1624


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147541#comment-15147541
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-184278657
  
You can close the PR @twalthr. It's merged (but doesn't automatically 
close).


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147533#comment-15147533
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1639

[FLINK-3226] Translation of explicit casting

This PR implements explicit casting from and to all supported types so far.

@fhueske @vasia can someone review?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink ExplicitCasting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1639






> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147343#comment-15147343
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia closed the pull request at:

https://github.com/apache/flink/pull/1632


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147225#comment-15147225
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1632#issuecomment-184170164
  
Great, merging this also.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147224#comment-15147224
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-184170079
  
Thanks @twalthr. I'll merge this.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147063#comment-15147063
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1632#issuecomment-184113402
  
Thanks for the changes. Looks good to merge ;-)


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15146694#comment-15146694
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1632#issuecomment-183957839
  
Thanks for the review @twalthr! I've addressed your comments.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145919#comment-15145919
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-183640190
  
Thanks for reviewing @fhueske!
I updated my PR and rebased it.
Feel free to merge ;-)


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145937#comment-15145937
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1634

[FLINK-3226] Casting support for arithmetic operators

This PR implements auto-casting for numeric arithmetic operators.

@fhueske @vasia mergable?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink ArithmeticCasting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1634


commit fd783341b93d7a7ffba11c827a317adb35248ea0
Author: twalthr 
Date:   2016-02-13T11:38:12Z

[FLINK-3226] Casting support for arithmetic operators




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145938#comment-15145938
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1632#discussion_r52826898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -148,16 +148,24 @@ class CodeGenerator(
   if (clazz == classOf[FlatMapFunction[_,_]]) {
 val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
 (s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
-  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
   }
 
   // MapFunction
   else if (clazz == classOf[MapFunction[_,_]]) {
 val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
 ("Object map(Object _in1)",
-  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
   }
 
+  // FlatJoinFunction
+  else if (clazz == classOf[FlatJoinFunction[_,_,_]]) {
+val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
+val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.get) //TODO 
check this
--- End diff --

Why don't you call `getOrElse` here and throw an CodeGenException if input2 
is not present? Then we can get rid of the TODO.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145945#comment-15145945
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1632#discussion_r52826951
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
 ---
@@ -63,7 +63,7 @@ class DataSetMap(
   config: TableConfig,
   expectedType: Option[TypeInformation[Any]])
 : DataSet[Any] = {
-val inputDataSet = 
input.asInstanceOf[DataSetRel].translateToPlan(config)
+val inputDataSet = 
input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
--- End diff --

You cannot forward the expected type, since you don't know what the 
previous operator does.
E.g. if the expected type is `Tuple2` but the previous operator outputs 
records with one field.
Why did you change this call?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145947#comment-15145947
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1632#discussion_r52827003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 ---
@@ -39,18 +49,80 @@ class DataSetJoinRule
 val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
 val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
 
-new DataSetJoin(
-  rel.getCluster,
-  traitSet,
-  convLeft,
-  convRight,
-  rel.getRowType,
-  join.toString,
-  Array[Int](),
-  Array[Int](),
-  JoinType.INNER,
-  null,
-  null)
+// get the equality keys
+val joinInfo = join.analyzeCondition
+val keyPairs = joinInfo.pairs
+
+if (keyPairs.isEmpty) { // if no equality keys => not supported
+  throw new TableException("Joins should have at least one equality 
condition")
+}
+else { // at least one equality expression => generate a join function
+  val conditionType = join.getCondition.getType
+  val func = getJoinFunction(join, joinInfo)
+  val leftKeys = ArrayBuffer.empty[Int]
+  val rightKeys = ArrayBuffer.empty[Int]
+
+  keyPairs.foreach(pair => {
+leftKeys.add(pair.source)
+rightKeys.add(pair.target)}
+  )
+
+  new DataSetJoin(
+rel.getCluster,
+traitSet,
+convLeft,
+convRight,
+rel.getRowType,
+join.toString,
+leftKeys.toArray,
+rightKeys.toArray,
+JoinType.INNER,
+null,
+func)
+}
+  }
+
+  def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo):
+  ((TableConfig, TypeInformation[Any], TypeInformation[Any], 
TypeInformation[Any]) =>
+  FlatJoinFunction[Any, Any, Any]) = {
+
+if (joinInfo.isEqui) {
+  // only equality condition => no join function necessary
+  null
--- End diff --

In general, `null` is not very welcome in Scala. Could you return a 
`FlatJoinFunction` (containing only a `ConverterResultExpression`) here too? We 
can then get rid of the `Tuple2RowMapper` and support any type as output type 
of the join operation.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145951#comment-15145951
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1632#discussion_r52827029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 ---
@@ -39,18 +49,80 @@ class DataSetJoinRule
 val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
 val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
 
-new DataSetJoin(
-  rel.getCluster,
-  traitSet,
-  convLeft,
-  convRight,
-  rel.getRowType,
-  join.toString,
-  Array[Int](),
-  Array[Int](),
-  JoinType.INNER,
-  null,
-  null)
+// get the equality keys
+val joinInfo = join.analyzeCondition
+val keyPairs = joinInfo.pairs
+
+if (keyPairs.isEmpty) { // if no equality keys => not supported
+  throw new TableException("Joins should have at least one equality 
condition")
+}
+else { // at least one equality expression => generate a join function
+  val conditionType = join.getCondition.getType
+  val func = getJoinFunction(join, joinInfo)
+  val leftKeys = ArrayBuffer.empty[Int]
+  val rightKeys = ArrayBuffer.empty[Int]
+
+  keyPairs.foreach(pair => {
+leftKeys.add(pair.source)
+rightKeys.add(pair.target)}
+  )
+
+  new DataSetJoin(
+rel.getCluster,
+traitSet,
+convLeft,
+convRight,
+rel.getRowType,
+join.toString,
+leftKeys.toArray,
+rightKeys.toArray,
+JoinType.INNER,
+null,
+func)
+}
+  }
+
+  def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo):
+  ((TableConfig, TypeInformation[Any], TypeInformation[Any], 
TypeInformation[Any]) =>
+  FlatJoinFunction[Any, Any, Any]) = {
+
+if (joinInfo.isEqui) {
+  // only equality condition => no join function necessary
+  null
+}
+else {
+  val func = (
+config: TableConfig,
+leftInputType: TypeInformation[Any],
+rightInputType: TypeInformation[Any],
+returnType: TypeInformation[Any]) => {
+
+  val generator = new CodeGenerator(config, leftInputType, 
Some(rightInputType))
+  val condition = generator.generateExpression(join.getCondition)
+  val body = {
--- End diff --

Unnecessary brackets?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144312#comment-15144312
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-183251312
  
Hi, I thought about using POJOs as native types within Table/SQL operators. 
IMO, the gains are very little compared to the added code complexity. Given a 
POJO input, we can preserve the input type only for very few operations such as 
a Filter. For most other operations, we need to generate a new output type 
(Tuple or Row). I am a bit skeptical about adding a lot of codeGen code with 
special cases for POJOs (such as the field index mapping) which is very seldom 
used. Moreover, POJO field accesses (for operations and serialization) go 
through reflection and are not very efficient. So even the performance gain for 
those few cases where POJOs can be used is not clear.

I do not question the native type support in general. Tuples and primitives 
should definitely be supported, but I don't think we need to support POJOs 
within Table / SQL operators. Instead, I would convert POJO datasets into Row 
tables during table scan. Most of the code in this PR can be used to implement 
a codeGen'd converter Map function.

What do you think @twalthr?



> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144480#comment-15144480
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-183287252
  
Ah, OK :-)
I have to admit that I didn't look at the changes in detail but noticed 
that they go into several places of the code gen. Hence, I was wondering if it 
would be possible to extract the special POJO case from the `CodeGenerator`, 
but I guess that would lead to code duplication. 

I'll start reviewing the changes.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144489#comment-15144489
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52731697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -75,5 +75,15 @@ class TableEnvironment {
   TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
   }
 
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataSet. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
--- End diff --

Ah, I see the comment is copied. But should be changed, no?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144558#comment-15144558
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52739054
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -150,7 +151,11 @@ object CodeGenUtils {
 
   sealed abstract class FieldAccessor
 
-  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
+  case class ObjectFieldAccessor(field: Field) extends FieldAccessor
+
+  case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor
+
+  case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor
--- End diff --

I think we do not have to separate the cases of public and private fields 
and make the fields always accessible.
This would reduce a bit of code complexity.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144563#comment-15144563
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52739359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -42,13 +42,30 @@ import scala.collection.mutable
   * @param config configuration that determines runtime behavior
   * @param input1 type information about the first input of the Function
   * @param input2 type information about the second input if the Function 
is binary
+  * @param inputPojoFieldMapping additional mapping information if input1 
is a POJO (POJO types
+  *  have no deterministic field order). We 
assume that input2 is
+  *  converted before and thus is never a POJO.
   */
 class CodeGenerator(
-config: TableConfig,
-input1: TypeInformation[Any],
-input2: Option[TypeInformation[Any]] = None)
+   config: TableConfig,
+   input1: TypeInformation[Any],
+   input2: Option[TypeInformation[Any]] = None,
+   inputPojoFieldMapping: Array[Int] = Array())
--- End diff --

Make this parameter optional?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144574#comment-15144574
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52740335
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -231,29 +252,40 @@ class CodeGenerator(
 * be reused, they will be added to reusable code sections internally. 
The evaluation result
 * may be stored in the global result variable (see [[outRecordTerm]]).
 *
-* @param fieldExprs
+* @param fieldExprs field expressions to be converted
 * @param returnType conversion target type. Type must have the same 
arity than fieldExprs.
+* @param resultFieldNames result field names necessary for a mapping 
to POJO fields.
 * @return instance of GeneratedExpression
 */
   def generateResultExpression(
   fieldExprs: Seq[GeneratedExpression],
-  returnType: TypeInformation[_ <: Any])
+  returnType: TypeInformation[_ <: Any],
+  resultFieldNames: Seq[String])
 : GeneratedExpression = {
-// TODO disable arity check for Rows and derive row arity from 
fieldExprs
 // initial type check
 if (returnType.getArity != fieldExprs.length) {
   throw new CodeGenException("Arity of result type does not match 
number of expressions.")
 }
 // type check
 returnType match {
+  case pt: PojoTypeInfo[_] =>
+fieldExprs.zipWithIndex foreach {
--- End diff --

Add a check that both `fieldExpr` and `resultFieldName` have the same 
length.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144401#comment-15144401
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-183268426
  
Hi @fhueske,

I completely share your opinion. This PR does exactly what you described. 
As you can read in `DataSetSource`: "It ensures that types without 
deterministic field order (e.g. POJOs) are not part of the plan translation." 
What I have implemented is proper reading of POJOs (into Table API) and writing 
of POJOs (out of Table API). Most of the code I have added is to provide 
codeGen'd converter Map functions for the source and the sink. POJOs are not 
used for any operatation within operations.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144484#comment-15144484
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52731336
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -75,5 +75,15 @@ class TableEnvironment {
   TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
   }
 
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataSet. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
--- End diff --

I would make name equivalence only required for POJOs and generic composite 
types types. 
Rows and tuples can be matched by position. Otherwise, fields would need to 
be renamed to `f0`, `f1`, etc. for tuples.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144564#comment-15144564
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52739425
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -42,13 +42,30 @@ import scala.collection.mutable
   * @param config configuration that determines runtime behavior
   * @param input1 type information about the first input of the Function
   * @param input2 type information about the second input if the Function 
is binary
+  * @param inputPojoFieldMapping additional mapping information if input1 
is a POJO (POJO types
+  *  have no deterministic field order). We 
assume that input2 is
+  *  converted before and thus is never a POJO.
   */
 class CodeGenerator(
-config: TableConfig,
-input1: TypeInformation[Any],
-input2: Option[TypeInformation[Any]] = None)
+   config: TableConfig,
+   input1: TypeInformation[Any],
+   input2: Option[TypeInformation[Any]] = None,
+   inputPojoFieldMapping: Array[Int] = Array())
   extends RexVisitor[GeneratedExpression] {
 
+  /**
+* A code generator for generating unary Flink
+* [[org.apache.flink.api.common.functions.Function]]s with one input.
+*
+* @param config configuration that determines runtime behavior
+* @param input type information about the input of the Function
+* @param inputPojoFieldMapping additional mapping information 
necessary if input is a
+*  POJO (POJO types have no deterministic 
field order).
+*/
+  def this(config: TableConfig, input: TypeInformation[Any], 
inputPojoFieldMapping: Array[Int]) =
--- End diff --

Make fieldMapping optional?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144611#comment-15144611
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52744266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -749,4 +867,23 @@ class CodeGenerator(
 reusableMemberStatements.add(statement)
   }
 
+  def addReusablePrivateFieldAccess(clazz: Class[_], fieldName: String): 
String = {
+val fieldTerm = s"field_${clazz.getCanonicalName.replace('.', 
'$')}_$fieldName"
+val fieldExtraction =
+  s"""
+|java.lang.reflect.Field $fieldTerm =
--- End diff --

`Field` is not serializable and should be `transient`.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144622#comment-15144622
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1624#issuecomment-183349741
  
Thanks @twalthr, the PR looks pretty good. 


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144584#comment-15144584
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1624#discussion_r52741150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -150,7 +151,11 @@ object CodeGenUtils {
 
   sealed abstract class FieldAccessor
 
-  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
+  case class ObjectFieldAccessor(field: Field) extends FieldAccessor
+
+  case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor
+
+  case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor
--- End diff --

Nevermind. I was thought about the `PojoSerializer` which always goes 
through reflection, even for public fields.
You are accessing the field directly if it is public which is good.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145227#comment-15145227
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1632

[FLINK-3226] Translate logical joins to physical

This PR contains the logical to physical translation for joins.
- Joins with equality conditions only do not generate a join function.
- Joins with equality and non-equality conditions evaluate the non-equality 
conditions inside a `FlatJoinFunction`.
- Joins without any equality conditions e.g. `in1.join.n2.where(a > b)` are 
currently not supported. We could consider supporting those later by generating 
a cross function.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink translateJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1632.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1632


commit f4563cf2ea78a993b00f6398006b1a064ad83857
Author: vasia 
Date:   2016-02-11T17:04:45Z

[FLINK-3226] Translate logical joins to physical




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142618#comment-15142618
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1600#issuecomment-182817751
  
Looks pretty good. Can you check the `@Ignore` annotations?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143300#comment-15143300
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1624

[FLINK-3226] Translation from and to POJOs for CodeGenerator

This PR implements full POJO support as input and output type of the Table 
API. It is now possible to convert from an arbitrary type to an other arbitrary 
type. I fixed the failling AsITCase and implemented additional tests from/to 
tuples, from/to POJOs and from/to Case classes.

@vasia and @fhueske feel free to review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink PojoSupport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1624.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1624


commit 2322d9f602c4b7e01a582958f918372efc214848
Author: twalthr 
Date:   2016-02-11T15:16:29Z

[FLINK-3226] Translation from and to POJOs for CodeGenerator




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142754#comment-15142754
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1600#issuecomment-182872698
  
@fhueske I fixed the aggregation hashCode and enables the ignored tests. 
Let me know if it's OK now!


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143030#comment-15143030
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1600#issuecomment-182957377
  
It's all green :D Merging.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143046#comment-15143046
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia closed the pull request at:

https://github.com/apache/flink/pull/1600


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140819#comment-15140819
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-182390675
  
Hey @twalthr,
I merged this yesterday but the PR didn't automatically close. Could you 
please close it manually? Thanks!


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140835#comment-15140835
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1600#issuecomment-182392029
  
I've rebased this one on top of `tableOnCalcite` which includes #1595.

Regarding the average aggregate, I went for @tillrohrmann's suggestion of 
adding 0.5. That means that an integer average of 1 and 2 will now return 2. 
Note that in the previous implementation of the Table API this operation would 
return 1 because integer average was maintaining a pair of sum and count.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140885#comment-15140885
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1595


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138788#comment-15138788
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52293343
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+object CodeGenUtils {
+
+  private val nameCounter = new AtomicInteger
+
+  def newName(name: String): String = {
+s"$name$$${nameCounter.getAndIncrement}"
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+case INT_TYPE_INFO => "int"
+case LONG_TYPE_INFO => "long"
+case SHORT_TYPE_INFO => "short"
+case BYTE_TYPE_INFO => "byte"
+case FLOAT_TYPE_INFO => "float"
+case DOUBLE_TYPE_INFO => "double"
+case BOOLEAN_TYPE_INFO => "boolean"
+case CHAR_TYPE_INFO => "char"
+
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+case INT_TYPE_INFO => "-1"
+case LONG_TYPE_INFO => "-1"
+case SHORT_TYPE_INFO => "-1"
+case BYTE_TYPE_INFO => "-1"
+case FLOAT_TYPE_INFO => "-1.0f"
+case DOUBLE_TYPE_INFO => "-1.0d"
+case BOOLEAN_TYPE_INFO => "false"
+case STRING_TYPE_INFO => "\"\""
+case CHAR_TYPE_INFO => "'\\0'"
+case _ => "null"
+  }
+
+  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType 
match {
+case nti: 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138784#comment-15138784
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52293004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.Indenter._
--- End diff --

Intellij tells me this import is unused


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138805#comment-15138805
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181824403
  
Hi Timo, the PR looks really good :-)

I found the following issues / questions:
- Accessing of POJO fields might not work.
- Can you add method comments to the code generation methods in 
`CodeGenerator` and `CodeGenUtils`?
- Would it make sense to separate the function and expression code gen, 
i.e., split the `CodeGenerator` class? 


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138705#comment-15138705
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1600#issuecomment-181794348
  
Thanks for the feedback @tillrohrmann, @twalthr!
I've moved the classes to `org.apache.flink.api.table.runtime` and tried to 
shorten the aggregates code using Numerics. I only left `AvgAggregate` as is, 
because integer average and float/double average are computed differently. We 
can always replace it with code generation later as @twalthr suggested.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138764#comment-15138764
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52291201
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.codegen.OperatorCodeGen._
+import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+class CodeGenerator(
+config: TableConfig,
+input1: TypeInformation[Any],
+input2: Option[TypeInformation[Any]] = None)
+  extends RexVisitor[GeneratedExpression] {
+
+  // set of member statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
+
+  // set of constructor statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableInitStatements = mutable.LinkedHashSet[String]()
+
+  // map of initial input unboxing expressions that will be added only once
+  // (inputTerm, index) -> expr
+  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), 
GeneratedExpression]()
+
+  def reuseMemberCode(): String = {
+reusableMemberStatements.mkString("", "\n", "\n")
+  }
+
+  def reuseInitCode(): String = {
+reusableInitStatements.mkString("", "\n", "\n")
+  }
+
+  def reuseInputUnboxingCode(): String = {
+reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
+  }
+
+  def input1Term = "in1"
+
+  def input2Term = "in2"
+
+  def collectorTerm = "c"
+
+  def outRecordTerm = "out"
+
+  def nullCheck: Boolean = config.getNullCheck
+
+  def generateExpression(rex: RexNode): GeneratedExpression = {
+rex.accept(this)
+  }
+
+  def generateFunction[T <: Function](
+  name: String,
+  clazz: Class[T],
+  bodyCode: String,
+  returnType: TypeInformation[Any])
+: GeneratedFunction[T] = {
+val funcName = newName(name)
+
+// Janino does not support generics, that's why we need
+// manual casting here
+val samHeader =
+  if (clazz == classOf[FlatMapFunction[_,_]]) {
+val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+(s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
+  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  } else if (clazz == classOf[MapFunction[_,_]]) {
+val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+("Object map(Object _in1)",
+  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  } else {
+// TODO more functions
+throw new 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139025#comment-15139025
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181902296
  
I have added comments and fixed the string bug. I will solve the other 
issues at the end of the week.
If you have no objections, you can merge the PR ;-)


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139052#comment-15139052
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52320399
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Byte] {
+  private var sum: Long = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+sum = 0
+count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+count += 1
+sum += value.asInstanceOf[Byte]
--- End diff --

I know. However, aggregating on a `Double` and then calling `toInt` before 
returning the result led to rounding errors.
e.g. there was a test where the sum was 231 of 21 records, i.e. the avg 
should be 11, but it was actually computed as 10.9 and rounded down to 10. 
If you have any idea how to avoid such cases, please let me know!


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138876#comment-15138876
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52301458
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.codegen.OperatorCodeGen._
+import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+class CodeGenerator(
+config: TableConfig,
+input1: TypeInformation[Any],
+input2: Option[TypeInformation[Any]] = None)
+  extends RexVisitor[GeneratedExpression] {
+
+  // set of member statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
+
+  // set of constructor statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableInitStatements = mutable.LinkedHashSet[String]()
+
+  // map of initial input unboxing expressions that will be added only once
+  // (inputTerm, index) -> expr
+  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), 
GeneratedExpression]()
+
+  def reuseMemberCode(): String = {
+reusableMemberStatements.mkString("", "\n", "\n")
+  }
+
+  def reuseInitCode(): String = {
+reusableInitStatements.mkString("", "\n", "\n")
+  }
+
+  def reuseInputUnboxingCode(): String = {
+reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
+  }
+
+  def input1Term = "in1"
+
+  def input2Term = "in2"
+
+  def collectorTerm = "c"
+
+  def outRecordTerm = "out"
+
+  def nullCheck: Boolean = config.getNullCheck
+
+  def generateExpression(rex: RexNode): GeneratedExpression = {
+rex.accept(this)
+  }
+
+  def generateFunction[T <: Function](
+  name: String,
+  clazz: Class[T],
+  bodyCode: String,
+  returnType: TypeInformation[Any])
+: GeneratedFunction[T] = {
+val funcName = newName(name)
+
+// Janino does not support generics, that's why we need
+// manual casting here
+val samHeader =
+  if (clazz == classOf[FlatMapFunction[_,_]]) {
+val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+(s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
+  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  } else if (clazz == classOf[MapFunction[_,_]]) {
+val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+("Object map(Object _in1)",
+  s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+  } else {
+// TODO more functions
+throw new 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138913#comment-15138913
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52304432
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 ---
@@ -58,19 +58,23 @@ class FlinkAggregate(
 )
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
-
-val origCosts = super.computeSelfCost(planner)
-val deltaCost = planner.getCostFactory.makeHugeCost()
-
-// only prefer aggregations with transformed Avg
-aggCalls.toList.foldLeft[RelOptCost](origCosts){
-  (c: RelOptCost, a: AggregateCall) =>
-if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
-  c.plus(deltaCost)
-} else {
-  c
-}
-}
-  }
+//
+// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
+//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
+//  
+//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+//
+//val origCosts = super.computeSelfCost(planner)
+//val deltaCost = planner.getCostFactory.makeHugeCost()
+//
+//// only prefer aggregations with transformed Avg
+//aggCalls.toList.foldLeft[RelOptCost](origCosts){
+//  (c: RelOptCost, a: AggregateCall) =>
+//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
+//  c.plus(deltaCost)
+//} else {
+//  c
+//}
+//}
+//  }
--- End diff --

It's fine to drop the code. We can improve the aggregations later.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138880#comment-15138880
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52301613
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.Indenter._
--- End diff --

I had the same problem with IntelliJ. But if you remove the import, the 
`j"..."` will not work anymore. Actually I think we don't need the Indenter at 
all.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138904#comment-15138904
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52303126
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+object CodeGenUtils {
+
+  private val nameCounter = new AtomicInteger
+
+  def newName(name: String): String = {
+s"$name$$${nameCounter.getAndIncrement}"
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+case INT_TYPE_INFO => "int"
+case LONG_TYPE_INFO => "long"
+case SHORT_TYPE_INFO => "short"
+case BYTE_TYPE_INFO => "byte"
+case FLOAT_TYPE_INFO => "float"
+case DOUBLE_TYPE_INFO => "double"
+case BOOLEAN_TYPE_INFO => "boolean"
+case CHAR_TYPE_INFO => "char"
+
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+case INT_TYPE_INFO => "-1"
+case LONG_TYPE_INFO => "-1"
+case SHORT_TYPE_INFO => "-1"
+case BYTE_TYPE_INFO => "-1"
+case FLOAT_TYPE_INFO => "-1.0f"
+case DOUBLE_TYPE_INFO => "-1.0d"
+case BOOLEAN_TYPE_INFO => "false"
+case STRING_TYPE_INFO => "\"\""
+case CHAR_TYPE_INFO => "'\\0'"
+case _ => "null"
+  }
+
+  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType 
match {
+case nti: 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138916#comment-15138916
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52304994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
 ---
@@ -37,14 +39,24 @@ class DataSetAggregateRule
 val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
 val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataSetConvention.INSTANCE)
 
-new DataSetReduce(
+val grouping = agg.getGroupSet.asList().map {
--- End diff --

I think you can use `ImmutableBitSet.toArray` to directly generate an int[].


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138936#comment-15138936
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52306136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
+
+  var result: T = _
+
+  override def aggregate(value: Any): Unit = {
+val input: T = value.asInstanceOf[T]
+val numericResult = implicitly[Numeric[T]]
--- End diff --

Move out of `aggregate()`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138937#comment-15138937
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52306203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import scala.reflect.runtime.universe._
+
+abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
+
+  var result: T = _
+
+  override def aggregate(value: Any): Unit = {
+val input: T = value.asInstanceOf[T]
+val numericResult = implicitly[Numeric[T]]
+
+result = numericResult.min(result, input)
+  }
+
+  override def getAggregated(): T = {
+result
+  }
+
+}
+
+// Numeric doesn't have max value
+class TinyMinAggregate extends MinAggregate[Byte] {
+
+override def initiateAggregate: Unit = {
--- End diff --

indention (also in the following classes).


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138885#comment-15138885
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181848000
  
Thanks for reviewing @fhueske.

I will comment my code more.

Regarding the splitting of the `CodeGenerator`, I think it is not that easy 
because functions and expressions both access reusable code parts. I will think 
about it again...




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138934#comment-15138934
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52306088
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Byte] {
+  private var sum: Long = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+sum = 0
+count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+count += 1
+sum += value.asInstanceOf[Byte]
--- End diff --

the `aggregate` method was previously implemented as `avgValue += (current 
- avgValue) / count` to avoid overflow on `sum`.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138931#comment-15138931
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52305988
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import scala.reflect.runtime.universe._
+
+abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
+
+  var result: T = _
+
+  override def aggregate(value: Any): Unit = {
+val input: T = value.asInstanceOf[T]
+val numericResult = implicitly[Numeric[T]]
--- End diff --

Can this be moved out of the `aggregate` method?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138881#comment-15138881
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1595#discussion_r52301780
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+object CodeGenUtils {
+
+  private val nameCounter = new AtomicInteger
+
+  def newName(name: String): String = {
+s"$name$$${nameCounter.getAndIncrement}"
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+case INT_TYPE_INFO => "int"
+case LONG_TYPE_INFO => "long"
+case SHORT_TYPE_INFO => "short"
+case BYTE_TYPE_INFO => "byte"
+case FLOAT_TYPE_INFO => "float"
+case DOUBLE_TYPE_INFO => "double"
+case BOOLEAN_TYPE_INFO => "boolean"
+case CHAR_TYPE_INFO => "char"
+
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
+// From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+// does not seem to like this, so we manually give the correct type 
here.
+case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+case _ =>
+  tpe.getTypeClass.getCanonicalName
+  }
+
+  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+case INT_TYPE_INFO => "-1"
+case LONG_TYPE_INFO => "-1"
+case SHORT_TYPE_INFO => "-1"
+case BYTE_TYPE_INFO => "-1"
+case FLOAT_TYPE_INFO => "-1.0f"
+case DOUBLE_TYPE_INFO => "-1.0d"
+case BOOLEAN_TYPE_INFO => "false"
+case STRING_TYPE_INFO => "\"\""
+case CHAR_TYPE_INFO => "'\\0'"
+case _ => "null"
+  }
+
+  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType 
match {
+case nti: 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138922#comment-15138922
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52305236
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 ---
@@ -39,6 +46,10 @@ class DataSetJoinRule
 val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
 val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
 
+val joinKeys = getJoinKeys(join)
--- End diff --

I would exclude the changes in this file from the PR.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139202#comment-15139202
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181955939
  
Thanks for the quick update @twalthr! Some tests are failing because the 
wrong type of exception is expected. I'll fix those and then merge this.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139332#comment-15139332
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52349128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Byte] {
+  private var sum: Long = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+sum = 0
+count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+count += 1
+sum += value.asInstanceOf[Byte]
--- End diff --

What about simply adding `0.5`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137174#comment-15137174
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52190250
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class SumAggregate[T] extends Aggregate[T]{
+
+}
+
+// TinyInt sum aggregate return Int as aggregated value.
+class TinyIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Byte]
+  }
+}
+
+// SmallInt sum aggregate return Int as aggregated value.
+class SmallIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Short]
+  }
+}
+
+// Int sum aggregate return Int as aggregated value.
+class IntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Int]
+  }
+}
+
+// Long sum aggregate return Long as aggregated value.
+class LongSumAggregate extends SumAggregate[Long] {
+
+  private var sumValue: Long = 0L
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Long]
+  }
+
+  override def getAggregated(): Long = {
+sumValue
+  }
+}
+
+// Float sum aggregate return Float as aggregated value.
+class FloatSumAggregate extends SumAggregate[Float] {
+  private var sumValue: Float = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Float]
+  }
+
+  override def getAggregated(): Float = {
+sumValue
+  }
+}
+
+// Double sum aggregate return Double as aggregated value.
+class DoubleSumAggregate extends SumAggregate[Double] {
+  private var sumValue: Double = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Double]
+  }
+
+  override def getAggregated(): Double = {
+sumValue
+  }
+}
--- End diff --

We could also replace it later using code generation.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137205#comment-15137205
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52192508
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 ---
@@ -58,19 +58,23 @@ class FlinkAggregate(
 )
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
-
-val origCosts = super.computeSelfCost(planner)
-val deltaCost = planner.getCostFactory.makeHugeCost()
-
-// only prefer aggregations with transformed Avg
-aggCalls.toList.foldLeft[RelOptCost](origCosts){
-  (c: RelOptCost, a: AggregateCall) =>
-if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
-  c.plus(deltaCost)
-} else {
-  c
-}
-}
-  }
+//
+// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
+//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
+//  
+//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+//
+//val origCosts = super.computeSelfCost(planner)
+//val deltaCost = planner.getCostFactory.makeHugeCost()
+//
+//// only prefer aggregations with transformed Avg
+//aggCalls.toList.foldLeft[RelOptCost](origCosts){
+//  (c: RelOptCost, a: AggregateCall) =>
+//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
+//  c.plus(deltaCost)
+//} else {
+//  c
+//}
+//}
+//  }
--- End diff --

Yes, we decided to disable `AggregateReduceFunctionsRule` temporarily, thus 
the note on top.
I could remove it completely if you think it'd be better.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137200#comment-15137200
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52192307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class MaxAggregate[T] extends Aggregate[T]{
+
+}
+
+class TinyIntMaxAggregate extends MaxAggregate[Byte] {
+  private var max = Byte.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Byte.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Byte]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Byte = {
+max
+  }
+}
+
+class SmallIntMaxAggregate extends MaxAggregate[Short] {
+  private var max = Short.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Short.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Short]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Short = {
+max
+  }
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+  private var max = Int.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Int]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Int = {
+max
+  }
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+  private var max = Long.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Long]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Long = {
+max
+  }
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+  private var max = Float.MinValue
--- End diff --

I missed to correct this, thnx!


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137210#comment-15137210
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52192844
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions
+
+import java.lang.Iterable
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import scala.collection.JavaConversions._
+import org.apache.flink.api.table.Row
+
+/**
+ * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the 
grouped data as input,
+ * feed to the aggregates, and collect the record with aggregated value.
+ *
+ * @param aggregates SQL aggregate functions.
+ * @param fields The grouped keys' indices in the input.
+ * @param groupingKeys The grouping keys' positions.
+ */
+class AggregateFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val fields: Array[Int],
+private val groupingKeys: Array[Int]) extends 
RichGroupReduceFunction[Row, Row] {
--- End diff --

Sounds reasonable. Together with the expression functions or separately, 
e.g. in `org.apache.flink.table.runtime.functions`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137003#comment-15137003
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52175496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class MaxAggregate[T] extends Aggregate[T]{
+
+}
+
+class TinyIntMaxAggregate extends MaxAggregate[Byte] {
+  private var max = Byte.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Byte.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Byte]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Byte = {
+max
+  }
+}
+
+class SmallIntMaxAggregate extends MaxAggregate[Short] {
+  private var max = Short.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Short.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Short]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Short = {
+max
+  }
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+  private var max = Int.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Int]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Int = {
+max
+  }
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+  private var max = Long.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Long]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Long = {
+max
+  }
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+  private var max = Float.MinValue
--- End diff --

Why `Float.MinValue` here and in `initiateAggregate` `Int.MinValue`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137006#comment-15137006
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52175572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class MaxAggregate[T] extends Aggregate[T]{
+
+}
+
+class TinyIntMaxAggregate extends MaxAggregate[Byte] {
+  private var max = Byte.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Byte.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Byte]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Byte = {
+max
+  }
+}
+
+class SmallIntMaxAggregate extends MaxAggregate[Short] {
+  private var max = Short.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Short.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Short]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Short = {
+max
+  }
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+  private var max = Int.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Int]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Int = {
+max
+  }
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+  private var max = Long.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Long]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Long = {
+max
+  }
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+  private var max = Float.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MinValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+val current = value.asInstanceOf[Float]
+if (current > max) {
+  max = current
+}
+  }
+
+  override def getAggregated(): Float = {
+max
+  }
+}
+
+class DoubleMaxAggregate extends MaxAggregate[Double] {
+  private var max = Double.MinValue
+
+  override def initiateAggregate: Unit = {
+max = Int.MaxValue
--- End diff --

Why `Int.MaxValue` here?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137018#comment-15137018
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52176834
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions
+
+import java.lang.Iterable
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import scala.collection.JavaConversions._
+import org.apache.flink.api.table.Row
+
+/**
+ * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the 
grouped data as input,
+ * feed to the aggregates, and collect the record with aggregated value.
+ *
+ * @param aggregates SQL aggregate functions.
+ * @param fields The grouped keys' indices in the input.
+ * @param groupingKeys The grouping keys' positions.
+ */
+class AggregateFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val fields: Array[Int],
+private val groupingKeys: Array[Int]) extends 
RichGroupReduceFunction[Row, Row] {
--- End diff --

I would move all runtime related function to 
`org.apache.flink.table.runtime`. IMO `plan` is not the right place for 
`functions`.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136976#comment-15136976
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1600

[FLINK-3226] Translate logical aggregations to physical

This PR builds on #1567 and addresses @fhueske's comments on translating 
aggregations.
Join translation is not part of this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink LogicalToPhysical

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1600.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1600


commit 6676aab520bd648c360f70a2047196d004ce1d31
Author: chengxiang li 
Date:   2016-02-01T07:18:14Z

[Flink-3226] Translate logical plan FlinkRels into physical plan 
DataSetRels.

commit cf41b740d32768185ec692e93754056ff6a16b59
Author: vasia 
Date:   2016-02-04T14:53:52Z

[FLINK-3226] implement GroupReduce translation; enable tests for supported 
operations

- compute average as sum and count for byte, short and int type to avoid 
rounding errors




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137011#comment-15137011
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52175960
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 ---
@@ -58,19 +58,23 @@ class FlinkAggregate(
 )
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
-
-val origCosts = super.computeSelfCost(planner)
-val deltaCost = planner.getCostFactory.makeHugeCost()
-
-// only prefer aggregations with transformed Avg
-aggCalls.toList.foldLeft[RelOptCost](origCosts){
-  (c: RelOptCost, a: AggregateCall) =>
-if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
-  c.plus(deltaCost)
-} else {
-  c
-}
-}
-  }
+//
+// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
+//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
+//  
+//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+//
+//val origCosts = super.computeSelfCost(planner)
+//val deltaCost = planner.getCostFactory.makeHugeCost()
+//
+//// only prefer aggregations with transformed Avg
+//aggCalls.toList.foldLeft[RelOptCost](origCosts){
+//  (c: RelOptCost, a: AggregateCall) =>
+//if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
+//  c.plus(deltaCost)
+//} else {
+//  c
+//}
+//}
+//  }
--- End diff --

Commented code?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137065#comment-15137065
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52181382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class SumAggregate[T] extends Aggregate[T]{
+
+}
+
+// TinyInt sum aggregate return Int as aggregated value.
+class TinyIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Byte]
+  }
+}
+
+// SmallInt sum aggregate return Int as aggregated value.
+class SmallIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Short]
+  }
+}
+
+// Int sum aggregate return Int as aggregated value.
+class IntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Int]
+  }
+}
+
+// Long sum aggregate return Long as aggregated value.
+class LongSumAggregate extends SumAggregate[Long] {
+
+  private var sumValue: Long = 0L
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Long]
+  }
+
+  override def getAggregated(): Long = {
+sumValue
+  }
+}
+
+// Float sum aggregate return Float as aggregated value.
+class FloatSumAggregate extends SumAggregate[Float] {
+  private var sumValue: Float = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Float]
+  }
+
+  override def getAggregated(): Float = {
+sumValue
+  }
+}
+
+// Double sum aggregate return Double as aggregated value.
+class DoubleSumAggregate extends SumAggregate[Double] {
+  private var sumValue: Double = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Double]
+  }
+
+  override def getAggregated(): Double = {
+sumValue
+  }
+}
--- End diff --

This is a lot of duplicated code. Could we make it a bit less redundant by 
doing something similar to?

```
class SAggregate[I: spire.math.Numeric, T: spire.math.Numeric] extends 
Aggregate[T] {

  var result: T = _
  /**
* Initialize the aggregate state.
*/
  override def initiateAggregate: Unit = {
result = implicitly[Numeric[T]].zero
  }

  /**
* Feed the aggregate field value.
*
* @param value
*/
  override def aggregate(value: Any): Unit = {
val input: I = value.asInstanceOf[I]

val numericInput = implicitly[spire.math.Numeric[I]]
val 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137691#comment-15137691
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52225586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class SumAggregate[T] extends Aggregate[T]{
+
+}
+
+// TinyInt sum aggregate return Int as aggregated value.
+class TinyIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Byte]
+  }
+}
+
+// SmallInt sum aggregate return Int as aggregated value.
+class SmallIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Short]
+  }
+}
+
+// Int sum aggregate return Int as aggregated value.
+class IntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Int]
+  }
+}
+
+// Long sum aggregate return Long as aggregated value.
+class LongSumAggregate extends SumAggregate[Long] {
+
+  private var sumValue: Long = 0L
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Long]
+  }
+
+  override def getAggregated(): Long = {
+sumValue
+  }
+}
+
+// Float sum aggregate return Float as aggregated value.
+class FloatSumAggregate extends SumAggregate[Float] {
+  private var sumValue: Float = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Float]
+  }
+
+  override def getAggregated(): Float = {
+sumValue
+  }
+}
+
+// Double sum aggregate return Double as aggregated value.
+class DoubleSumAggregate extends SumAggregate[Double] {
+  private var sumValue: Double = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Double]
+  }
+
+  override def getAggregated(): Double = {
+sumValue
+  }
+}
--- End diff --

No objections. I was just thinking about future improvements.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137685#comment-15137685
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52225277
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions
+
+import java.lang.Iterable
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import scala.collection.JavaConversions._
+import org.apache.flink.api.table.Row
+
+/**
+ * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the 
grouped data as input,
+ * feed to the aggregates, and collect the record with aggregated value.
+ *
+ * @param aggregates SQL aggregate functions.
+ * @param fields The grouped keys' indices in the input.
+ * @param groupingKeys The grouping keys' positions.
+ */
+class AggregateFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val fields: Array[Int],
+private val groupingKeys: Array[Int]) extends 
RichGroupReduceFunction[Row, Row] {
--- End diff --

I would move everything there that is needed during runtime. 
`AggregateFunction` in `org.apache.flink.table.runtiime` and helper classes in 
sub-packages.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137409#comment-15137409
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52204752
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class SumAggregate[T] extends Aggregate[T]{
+
+}
+
+// TinyInt sum aggregate return Int as aggregated value.
+class TinyIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Byte]
+  }
+}
+
+// SmallInt sum aggregate return Int as aggregated value.
+class SmallIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Short]
+  }
+}
+
+// Int sum aggregate return Int as aggregated value.
+class IntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Int]
+  }
+}
+
+// Long sum aggregate return Long as aggregated value.
+class LongSumAggregate extends SumAggregate[Long] {
+
+  private var sumValue: Long = 0L
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Long]
+  }
+
+  override def getAggregated(): Long = {
+sumValue
+  }
+}
+
+// Float sum aggregate return Float as aggregated value.
+class FloatSumAggregate extends SumAggregate[Float] {
+  private var sumValue: Float = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Float]
+  }
+
+  override def getAggregated(): Float = {
+sumValue
+  }
+}
+
+// Double sum aggregate return Double as aggregated value.
+class DoubleSumAggregate extends SumAggregate[Double] {
+  private var sumValue: Double = 0
+
+  override def initiateAggregate: Unit = {
+sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+sumValue += value.asInstanceOf[Double]
+  }
+
+  override def getAggregated(): Double = {
+sumValue
+  }
+}
--- End diff --

I like @tillrohrmann's suggestion. Any reason why not use Scala's `Numeric` 
though?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134451#comment-15134451
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1595

[FLINK-3226] Implement a CodeGenerator for an efficient translation to 
DataSet programs

This PR implements a CodeGenerator for Table API on Calcite.

It includes the following:
- FilterITCase, SelectITCase and ExpressionITCase are mostly working again
- Null values are now fully supported and tested with new 
`TableProgramsTestBase`
- Arithmetic `+-*/ +X -X`, logical operators `== != AND, OR, NOT, IS NULL, 
NOT NULL` are supported.
- Logical operators implement 3-valued logic.
- A new configuration parameter `efficientTypeUsage` allows generated 
DataSet programs to be as efficient as ordinary DataSet programs by avoiding 
unnecessary type conversions and using most efficient type for every operator.

Limitations:
- String functions are not yet supported. They will be forwarded to 
Calcites runtime functions once I have implemented the necessary interfaces for 
that.
- Expression type casting is missing yet
- Date Literal is not supported yet

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink CodeGen

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1595


commit c1a63287de0014f0b5bd23d1e0bc31e679f02fbe
Author: twalthr 
Date:   2016-02-05T16:40:54Z

[FLINK-3226] Implement a CodeGenerator for an efficient translation to 
DataSet programs




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132156#comment-15132156
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1579#issuecomment-179777395
  
I'll merge this PR to the `tableOnCalcite` branch.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15129042#comment-15129042
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/1579

[FLINK-3226] Add DataSet scan and  conversion to DataSet[Row]



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink dataSetTrans

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1579


commit c0073a13fcbbe4a730c2d967561838a28d574c2d
Author: Fabian Hueske 
Date:   2016-02-02T16:15:28Z

[FLINK-3226] Add DataSet scan and  conversion to DataSet[Row]




> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-27 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119050#comment-15119050
 ] 

Chengxiang Li commented on FLINK-3226:
--

[~fhueske] and [~twalthr] , this task should be blocked by me for a while, 
sorry about that. The task seems more complicated and has more work than i 
expected, it contains the physical plan genration part of Timo's prototype + 
the whole previous Table API translation part  actually, to move this forward 
faster, as Fabian suggested, i agree that we can split it into multi sub tasks, 
such as expression code generation, and even by the Calcite RelNodes, for 
example, we can work on RelNodes with no dependency in parallel, such as Sort, 
Join and Aggregate, etc. i would make this task very small, which only contains 
the Project translator without expression code generation and make it ready 
during this week.
Besides, while try to test my code, i found the {{PlannerImpl}} check its state 
during each step of query planning process, and there is no way to set it 
manually, for Table API, we actually skip several steps, such as parse, 
validate, it failed while we directly go to the optimize step due to the 
unmatched state, do you guys have any idea about this?

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >