[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302218#comment-15302218
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2025


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301905#comment-15301905
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221832119
  
Merging


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301672#comment-15301672
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221796795
  
Perfect! Good to merge. Thanks @yjshen!


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301332#comment-15301332
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221762766
  
Hi @fhueske , I've updated the `expectedType` fix and reverted changes in 
`DataSetUnion` and `DataSetUnionRule` to make the implementation clear and 
simple, what do you think?


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299817#comment-15299817
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64549056
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+if (all) {
+  leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+} else {
+  leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
--- End diff --

This method should called by the optimizer when a new `DataSetUnion` node 
is created to estimate the cost of the subplan (the new node + all recursive 
input nodes). If there is a cheaper plan that does the same thing, the more 
expensive plan is discarded.

So, you have a plan with a non-union all operator. The optimizer knows the 
cost of this plan. Then, the `UnionToDistinctRule` is called and a new union + 
distinct operators are created. For both, the `computeSelfCost` method is 
called to compute the cost estimate of the plan and then the cheaper of both 
plans is preserved. (This is a bit simplified, because the optimization rules 
are applied on `LogicalRel` nodes but the cost estimation happens on the 
`DataSetRel` nodes).


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299795#comment-15299795
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64547780
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+if (all) {
+  leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+} else {
+  leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
--- End diff --

I have question here on computeSelfCost, when will this method called? Is 
that possible we are using union's computeSelfCompute first, before 
UnionToDistinctRule are called?  I was trying to understand this last night but 
didn't come up with an idea.   


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299788#comment-15299788
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64546723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
--- End diff --

Ah yes, I didn't notice that, will do :)


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299752#comment-15299752
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221523124
  
Hi, thanks for the update and the `expectedType` fix!
Can you extend it as I described in the comment and do either of the two 
options regarding the `UnionToDistinctRule`?

Thanks!


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299751#comment-15299751
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64543161
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
--- End diff --

With this fix, we should be able to remove all guards for efficient type 
usage in the Union ITCases.
There are a few like the following:
```
if (tEnv.getConfig.getEfficientTypeUsage) {
  return
}
```


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299744#comment-15299744
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64542323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
--- End diff --

Actually, this fix should be extended. If `expectedType == None`, we must 
ensure that both inputs provide the same type, i.e., request the type of the 
left input on the right input (or vice versa).


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299736#comment-15299736
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64541694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+if (all) {
+  leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+} else {
+  leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
--- End diff --

Oh, yes. Completely forgot about that rule...  
So, we already supported the non-all union for SQL. Only the Table API was 
missing the `union()` method.
I think there are two ways to continue: 
- remove the `UnionToDistinctRule` from `FlinkRuleSets`
- revert the changes on `DataSetUnion` (except of pushing down the 
`expectedType`) and `DataSetUnionRule`.

I am fine either ways.


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299723#comment-15299723
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64540766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
--- End diff --

Good catch!


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299684#comment-15299684
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221508839
  
@fhueske would you please take another look? Thanks.


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298114#comment-15298114
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64383299
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,16 +73,23 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(
+  rowCnt,
+  if (all) 0 else rowCnt,
+  if (all) 0 else rowCnt)
   }
 
   override def translateToPlan(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
--- End diff --

`expectedType` is passed down to `Union`'s children, enables possible 
conversion to `Row` enforced by `Aggregate`.


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298073#comment-15298073
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64377174
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
 
   override def map(value: IN): OUT = {
 
-val input = value.asInstanceOf[Row]
+val input = value.asInstanceOf[Product]
--- End diff --

Ah, I think I get why this happen, will fix this.


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298057#comment-15298057
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64371728
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
 
   override def map(value: IN): OUT = {
 
-val input = value.asInstanceOf[Row]
+val input = value.asInstanceOf[Product]
--- End diff --

In `Execution mode = COLLECTION, Table config = EFFICIENT` for `testUnion`, 
the `value` is of `scala.Tuple3` type, not work as expected?


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298039#comment-15298039
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221232111
  
Changes look good. Can you also please update the supported feature set in 
the docs (`docs/apis/table.md`)?

Should be good to merge once that is done. 
Thanks, Fabian


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298031#comment-15298031
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64367480
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
 
   override def map(value: IN): OUT = {
 
-val input = value.asInstanceOf[Row]
+val input = value.asInstanceOf[Product]
--- End diff --

Currently aggregates do only support `Row`, because the aggregate code is 
not generated yet.
`DataSetAggregate` enforces `Row` as input type (see `DataSetAggregate` 
line 99), so `value.asInstanceOf[Row]` should be safe. 

Did you observe a problem with this cast?


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297823#comment-15297823
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221186678
  
Thanks for the PR, @yjshen. Looks good except for a few minor comments.
Can you also add one test method to the Scala SQL UnionITCase, such that 
the SQL side is also covered?

Union on streams cannot be supported right now. It would need a lot of the 
windowing logic to deduplicate rows.

Thanks, Fabian


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297818#comment-15297818
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64339091
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
 ---
@@ -54,7 +54,22 @@ class UnionITCase(
   }
 
   @Test
-  def testTernaryUnion(): Unit = {
+  def testUnion(): Unit = {
--- End diff --

There were some concerns about the build time of the `flink-table` module 
lately. In the past, we added quite a few integration tests without thinking 
about the added coverage vs. build time trade-off.

I would propose to replace the `testUnionWithFilter()` and 
`testUnionWithJoin()` tests with your new methods because these methods do not 
add to test coverage, IMO. 


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297815#comment-15297815
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64338444
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
 ---
@@ -43,7 +43,7 @@ public UnionITCase(TestExecutionMode mode) {
}
 
@Test
-   public void testUnion() throws Exception {
+   public void testUnionAll() throws Exception {
--- End diff --

Actually, I would completely remove the whole Java `UnionITCase.java` file. 
It tests exactly the same methods as the Scala tests (no expression parsing 
involved). 


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297806#comment-15297806
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2025#discussion_r64338066
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -69,7 +73,7 @@ class DataSetUnion(
   rows + metadata.getRowCount(child)
 }
 
-planner.getCostFactory.makeCost(rowCnt, 0, 0)
+planner.getCostFactory.makeCost(if (all) rowCnt else rowCnt * 0.1, 0, 
0)
--- End diff --

The cost for union should be higher than for union all. Also, `rowCnt` is 
the number of rows processed, not the result size.
How about this?
```
planner.getCostFactory.makeCost(
  rowCnt, 
  if (all) 0 else, rowCnt,
  if (all) 0 else rowCnt)
```


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297679#comment-15297679
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

GitHub user yjshen opened a pull request:

https://github.com/apache/flink/pull/2025

[FLINK-3941][TableAPI]Add support for UNION (with duplicate elimination)

This PR aims at adding `UNION` support in TableAPI and SQL by:
- Extending Table API with a new `union()` method
- Relaxing `DataSetUnionRule` to enable union conversion
- a `distinct` after `union` in flink execution plan to eliminate duplicate 
rows

Note: Currently, I think `Union` do not has its counterpart in DataStream, 
therefore left unsupported. If it's not true, I'd like to adapt this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yjshen/flink union

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2025


commit fb8b61b5638f0b52f5857341c7acc95e8985b2d4
Author: Yijie Shen 
Date:   2016-05-24T04:46:21Z

add union support




> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)