[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217408#comment-16217408
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4858


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217073#comment-16217073
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r146591835
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -486,11 +486,6 @@ case class Join(
 s"Invalid join condition: $expression. At least one equi-join 
predicate is " +
   s"required.")
 }
-if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || 
localPredicateFound)) {
--- End diff --

Good point. Will remove that. Thanks @lincoln-lil!


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216983#comment-16216983
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r146504940
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -486,11 +486,6 @@ case class Join(
 s"Invalid join condition: $expression. At least one equi-join 
predicate is " +
   s"required.")
 }
-if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || 
localPredicateFound)) {
--- End diff --

Local variable `nonEquiJoinPredicateFound` and `localPredicateFound` can be 
removed here,  and  `checkIfFilterCondition` method is no longer needed


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216789#comment-16216789
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4858
  
Thanks for the review @twalthr. 
The docs don't need to be touched. The previous restrictions were not 
documented...

Will merge this PR.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216522#comment-16216522
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4858
  
I scanned through the code quickly and could not find any major issues. 
Well documented, well tested. +1 to merge this. Please don't fotget to update 
the docs.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215811#comment-16215811
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4858
  
I'll merge the PR in the next days.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213326#comment-16213326
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4858
  
Thanks for the review @xccui! 
I've updated the PR.
Cheers, Fabian


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213322#comment-16213322
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r146076115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
 ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
 val joinInfo = join.analyzeCondition
 
 // joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
-// and disable outer joins with non-equality predicates(see FLINK-5520)
-!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+!joinInfo.pairs().isEmpty
--- End diff --

Yes, that is true but the rule are also applied in different contexts. 
`FlinkLogicalJoin` is used for the initial translation of batch and stream 
programs and `DataSetJoinRule` only for batch. I think it's OK to have these 
checks as safety net.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212508#comment-16212508
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r145938669
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -156,65 +163,394 @@ class DataSetJoin(
 val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-val (joinOperator, nullCheck) = joinType match {
-  case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
-  case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), 
true)
-  case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), 
true)
-  case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), 
true)
+joinType match {
+  case JoinRelType.INNER =>
+addInnerJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.LEFT =>
+addLeftOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.RIGHT =>
+addRightOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.FULL =>
+addFullOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
 }
+  }
 
-if (nullCheck && !config.getNullCheck) {
-  throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
-}
+  private def addInnerJoin(
+  left: DataSet[Row],
+  right: DataSet[Row],
+  leftKeys: Array[Int],
+  rightKeys: Array[Int],
+  resultType: TypeInformation[Row],
+  config: TableConfig): DataSet[Row] = {
 
 val generator = new FunctionCodeGenerator(
   config,
-  nullCheck,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
+  false,
+  left.getType,
+  Some(right.getType))
 val conversion = generator.generateConverterResultExpression(
-  returnType,
+  resultType,
   joinRowType.getFieldNames)
 
-var body = ""
+val condition = generator.generateExpression(joinCondition)
+val body =
+  s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ |  ${conversion.code}
+ |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
 
-if (joinInfo.isEqui) {
-  // only equality condition
-  body = s"""
-   |${conversion.code}
-   |${generator.collectorTerm}.collect(${conversion.resultTerm});
-   |""".stripMargin
-}
-else {
-  val nonEquiPredicates = 
joinInfo.getRemaining(this.cluster.getRexBuilder)
-  val condition = generator.generateExpression(nonEquiPredicates)
-  body = s"""
-   |${condition.code}
-   |if (${condition.resultTerm}) {
-   |  ${conversion.code}
-   |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-   |}
-   |""".stripMargin
-}
 val genFunction = generator.generateFunction(
   ruleDescription,
   classOf[FlatJoinFunction[Row, Row, Row]],
   body,
-  returnType)
+  resultType)
 
 val joinFun = new FlatJoinRunner[Row, Row, Row](
   genFunction.name,
   genFunction.code,
   genFunction.returnType)
 
-val joinOpName =
-  s"where: (${joinConditionToString(joinRowType, joinCondition, 
getExpressionString)}), " +
-s"join: (${joinSelectionToString(joinRowType)})"
+left.join(right)
+  .where(leftKeys: _*)
+  .equalTo(rightKeys: _*)
+  .`with`(joinFun)
+  .name(getJoinOpName)
+  }
+
+  private def addLeftOuterJoin(
+  left: DataSet[Row],
+  right: DataSet[Row],
+  leftKeys: Array[Int],
+  rightKeys: Array[Int],
+  resultType: TypeInformation[Row],
+  config: 

[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212511#comment-16212511
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r145863086
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
 ---
@@ -97,8 +97,7 @@ private class FlinkLogicalJoinConverter
 
   private def hasEqualityPredicates(join: LogicalJoin, joinInfo: 
JoinInfo): Boolean = {
--- End diff --

The parameter `join` seems to be useless now.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212510#comment-16212510
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r145863708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
 ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
 val joinInfo = join.analyzeCondition
 
 // joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
-// and disable outer joins with non-equality predicates(see FLINK-5520)
-!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+!joinInfo.pairs().isEmpty
--- End diff --

The condition is checked three times in `FlinkLogicalJoin`, 
`DataSetJoinRule`, and `DataSetJoin`. It brings extra maintenance work every 
time we change the validation rule. 


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212509#comment-16212509
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r145938918
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -156,65 +163,394 @@ class DataSetJoin(
 val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-val (joinOperator, nullCheck) = joinType match {
-  case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
-  case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), 
true)
-  case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), 
true)
-  case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), 
true)
+joinType match {
+  case JoinRelType.INNER =>
+addInnerJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.LEFT =>
+addLeftOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.RIGHT =>
+addRightOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
+  case JoinRelType.FULL =>
+addFullOuterJoin(
+  leftDataSet,
+  rightDataSet,
+  leftKeys.toArray,
+  rightKeys.toArray,
+  returnType,
+  config)
 }
+  }
 
-if (nullCheck && !config.getNullCheck) {
-  throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
-}
+  private def addInnerJoin(
+  left: DataSet[Row],
+  right: DataSet[Row],
+  leftKeys: Array[Int],
+  rightKeys: Array[Int],
+  resultType: TypeInformation[Row],
+  config: TableConfig): DataSet[Row] = {
 
 val generator = new FunctionCodeGenerator(
   config,
-  nullCheck,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
+  false,
+  left.getType,
+  Some(right.getType))
 val conversion = generator.generateConverterResultExpression(
-  returnType,
+  resultType,
   joinRowType.getFieldNames)
 
-var body = ""
+val condition = generator.generateExpression(joinCondition)
+val body =
+  s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ |  ${conversion.code}
+ |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
 
-if (joinInfo.isEqui) {
-  // only equality condition
-  body = s"""
-   |${conversion.code}
-   |${generator.collectorTerm}.collect(${conversion.resultTerm});
-   |""".stripMargin
-}
-else {
-  val nonEquiPredicates = 
joinInfo.getRemaining(this.cluster.getRexBuilder)
-  val condition = generator.generateExpression(nonEquiPredicates)
-  body = s"""
-   |${condition.code}
-   |if (${condition.resultTerm}) {
-   |  ${conversion.code}
-   |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-   |}
-   |""".stripMargin
-}
 val genFunction = generator.generateFunction(
   ruleDescription,
   classOf[FlatJoinFunction[Row, Row, Row]],
   body,
-  returnType)
+  resultType)
 
 val joinFun = new FlatJoinRunner[Row, Row, Row](
   genFunction.name,
   genFunction.code,
   genFunction.returnType)
 
-val joinOpName =
-  s"where: (${joinConditionToString(joinRowType, joinCondition, 
getExpressionString)}), " +
-s"join: (${joinSelectionToString(joinRowType)})"
+left.join(right)
+  .where(leftKeys: _*)
+  .equalTo(rightKeys: _*)
+  .`with`(joinFun)
+  .name(getJoinOpName)
+  }
+
+  private def addLeftOuterJoin(
+  left: DataSet[Row],
+  right: DataSet[Row],
+  leftKeys: Array[Int],
+  rightKeys: Array[Int],
+  resultType: TypeInformation[Row],
+  config: 

[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210847#comment-16210847
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4858

[FLINK-7755] [table] Fix NULL handling in batch joins.

## What is the purpose of the change

This PR fixes a couple of issues with Table API / SQL batch joins:
- Proper support for joining null values for inner and outer joins
- Support for non-equi join predicates in outer joins (at least one 
equi-join predicate is required)
- Support for local predicates on the outer input of outer joins (at least 
one equi-join predicate is required)

## Brief change log

- Inner & Outer Joins: Evaluate all join predicates in a code-gen'd 
function (also equi-join predicates) for correct handling of three-value logic
- Outer joins: translate outer joins into a sequence of GroupReduce -> 
OuterJoin -> GroupReduce. 
  - The first GroupReduce groups on the full input row and deduplicates the 
outer side(s) of the join. A count for the number of deduplicated rows is kept.
  - The OuterJoin evaluates the join predicate and computes possible join 
pairs of left and right rows. The non-outer element of the pair can be null if 
the join predicate does not match.
  - The second GroupReduce groups again on the full input row and computes 
for each outer row the join result. If it was not match with any inner row, it 
produces a null-padded result.
  - The plan for left and right outer joins requires only a single initial 
partitioning and sort of each input. The all operators can reuse the initial 
sort and produce a sorted result again. A full outer join requires an 
additional partitioning and sorting step.
- Checks for outer join translation are removed to allow outer joins with 
non-equi and local predicates.

## Verifying this change

- added ITCases for the new outer join features to `JoinITCase`
- added plan tests for Table API and SQL for the new outer join features
- updated validation tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **yes**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

The documentation does not need to be adjusted because the outer join 
limitation were not documented. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableBatchNullJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4858.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4858


commit 1434d8d7debe207e4b8350199eded4e678885571
Author: Fabian Hueske 
Date:   2017-10-15T15:55:23Z

[FLINK-7755] [table] Fix NULL handling in batch joins.

Fixes [FLINK-5498] (Add support for non-equi join and local predicates to 
outer joins) as well.




> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 

[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209189#comment-16209189
 ] 

Fabian Hueske commented on FLINK-7755:
--

I think we should treat this kind of bugs as blockers. 
If something fails or doesn't work it is bad, but if it returns wrong results 
without users noticing its much worse, IMO.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-18 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209183#comment-16209183
 ] 

Till Rohrmann commented on FLINK-7755:
--

But this bug is already present since Flink 1.3, right? I wouldn't really block 
1.4 on this issue. Even though I think we should try to push hard to get it in 
for 1.4.

> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208992#comment-16208992
 ] 

Fabian Hueske commented on FLINK-7755:
--

We compute wrong results. This is qualifies as a blocker, IMO.

> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-18 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208982#comment-16208982
 ] 

Till Rohrmann commented on FLINK-7755:
--

Is this truly a blocker issue? I would assess it as critical since it might 
already have been included in 1.3.

> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)