[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-19 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974471#comment-15974471
 ] 

Kurt Young commented on FLINK-6149:
---

Hi [~dshkvyra], i have tried to reproduce the problem with out this change, and 
it also raise the same exception. 
I believe the root cause is actually the {{LogicalJoin(condition=[<($1, $0)], 
joinType=[right])}} does't have equality contidion and the join type is not 
inner. The {{DataSetJoinRule}} has explicitly forbidden such kind of join by 
the change of https://issues.apache.org/jira/browse/FLINK-5520


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-19 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974460#comment-15974460
 ] 

Kurt Young commented on FLINK-6149:
---

[~dshkvyra] Thanks for the reporting, i will look into it.

> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974354#comment-15974354
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user DmytroShkvyra commented on the issue:

https://github.com/apache/flink/pull/3594
  
@KurtYoung, @fhueske  This PR violated work with null nodes:
`
org.apache.flink.table.api.TableException: Cannot generate a valid 
execution plan for the given query: 

LogicalProject(a=[$1], cnt=[$0])
  LogicalJoin(condition=[<($1, $0)], joinType=[right])
LogicalProject(cnt=[$0])
  LogicalFilter(condition=[<($0, 0)])
LogicalAggregate(group=[{}], cnt=[COUNT()])
  LogicalProject($f0=[0])
LogicalTableScan(table=[[B]])
LogicalTableScan(table=[[A]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL 
features.

at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:267)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:235)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:265)
at 
org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
at 
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)`


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974000#comment-15974000
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3594


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973910#comment-15973910
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Rebased to master and will merge after build check


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972745#comment-15972745
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3594
  
Hi @KurtYoung, thanks for the update!
I think this PR is good to merge.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972665#comment-15972665
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111953023
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

+1


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972547#comment-15972547
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
@fhueske I have rename the package and update the union description. 


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972525#comment-15972525
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111931445
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

+1, will change.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972519#comment-15972519
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111930527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

OK, will rename the packages.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972481#comment-15972481
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3594
  
Thanks for the update @KurtYoung. I'll have a detailed look at the changes 
later today.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972469#comment-15972469
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111919363
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Yes, you are right. DataStream can only do `UNION ALL`. 
We should prevent the translation of `UNION` and also change the 
explainString to `union all`, IMO.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972464#comment-15972464
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

+1


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972466#comment-15972466
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

I see, +1


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972462#comment-15972462
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918681
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

I'd like to keep the package name at `org.apache.flink.table.plan.nodes` to 
avoid another refactoring, but `logical` sounds better than `flinklogical`, IMO.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972454#comment-15972454
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111917910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

I see, thanks for the explanation. I agree, we need to keep the restriction 
here, to push the logical plan in the right direction. 

We might need different sets of logical optimization rules for batch and 
streaming at some point.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972040#comment-15972040
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Hi @fhueske , sorry for taking it so long to update this PR. 
Looks like it has so many conflicts with master now, once you are ok with 
all the changes, i will rebase it to master.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972034#comment-15972034
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861591
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

I will remove this simple comment to be consistent will other convert rules.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972028#comment-15972028
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

I'm ok with both, will change to `Calc`.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972024#comment-15972024
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860657
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i 
understand correctly, `UNION` will do a global distinct for all fields and 
`UNION ALL` just concat two datasets or datastreams. I think the behavior of 
`DataStream.union` is rather `UNION ALL` than `UNION`.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972018#comment-15972018
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

We need to rewrite `explainTerms` and `toString` to make the RelNode's 
digest distinguished from other `TableSourceScan`. The default implementation 
only compare the table name and it's not enough when we push filter or projects 
into the `TableSource`. 

Actually there is a bug in `toString` and fixed in 
https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a,
 i will backport it in here.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972015#comment-15972015
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111859756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

yes, these can be removed


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971990#comment-15971990
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

How about rename `org.apache.flink.table.plan.node` to 
`org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` 
and `datastream`?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971988#comment-15971988
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Currently the answer is yes, because the physical join can only support 
equation condition with simple keys. 
If the join key is express like `a + 1 = b - 2`, if we don't have this 
restriction in logical layer, the join condition will be like this and we can't 
translate to physical join. There are 2 possible solutions:
1. keep it this way
2. Re-introduce rules which can exact expression out of join condition and 
add addition "Calc" node to keep join key as simple field. 
I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to 
let logical layer know all these restrictions. What do you think?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971969#comment-15971969
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111856357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Yes, i will first test how HepPlanner works in some test and product 
environments and then decide whether or how we change it to rule-based planner. 
So this will be my follow up task.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953351#comment-15953351
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109253071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Isn't the plan to use the rule-based `HepPlanner` instead of the 
`VolcanoPlanner` for logical optimization or should switching be a follow-up 
step?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953349#comment-15953349
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

some of the other methods could be removed as well


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953345#comment-15953345
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262692
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Why did you change `UNION` to `UNION ALL`?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953352#comment-15953352
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262077
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

move the `FlinkLogicalRels` to 
`org.apache.flink.table.plan.nodes.flinklogical`?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953347#comment-15953347
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109261556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Do we need these checks here? Whether the join can be translated depends 
also on stream and batch. So we could just create a join and let the physical 
optimization decide whether this can be translated or not.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953342#comment-15953342
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109261751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

Should we apply translation restrictions on the logical level? DataSet and 
DataStream might differ in their support for different types of operations. It 
might be better to let the physical optimization decides what's supported and 
what is not.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953348#comment-15953348
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109257615
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

update comment


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953343#comment-15953343
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

other `FlinkLogical` nodes do not implement `explainTerms()`


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953344#comment-15953344
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109395373
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

Can we change this to `def getRowCount(rel: Calc, mq: RelMetadataQuery): 
Double` to cover `DataSetCalc` and `FlinkLogicalCalc` or would that be too 
generic?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953350#comment-15953350
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262780
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
 ---
@@ -85,8 +85,10 @@ class QueryDecorrelationTest extends TableTestBase {
   term("joinType", "InnerJoin")
 ),
 term("select", "empno0", "salary")
-  ),
-  term("groupBy", "empno0"),
+  )
+  ,
+  term("groupBy", "empno0")
--- End diff --

revert these changes?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953346#comment-15953346
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262207
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

the other `FlinkLogical` nodes do not implement `explainTerms()`


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)