[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217408#comment-16217408 ] ASF GitHub Bot commented on FLINK-7755: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4858 > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217073#comment-16217073 ] ASF GitHub Bot commented on FLINK-7755: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r146591835 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -486,11 +486,6 @@ case class Join( s"Invalid join condition: $expression. At least one equi-join predicate is " + s"required.") } -if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) { --- End diff -- Good point. Will remove that. Thanks @lincoln-lil! > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216983#comment-16216983 ] ASF GitHub Bot commented on FLINK-7755: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r146504940 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -486,11 +486,6 @@ case class Join( s"Invalid join condition: $expression. At least one equi-join predicate is " + s"required.") } -if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) { --- End diff -- Local variable `nonEquiJoinPredicateFound` and `localPredicateFound` can be removed here, and `checkIfFilterCondition` method is no longer needed > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216789#comment-16216789 ] ASF GitHub Bot commented on FLINK-7755: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4858 Thanks for the review @twalthr. The docs don't need to be touched. The previous restrictions were not documented... Will merge this PR. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216522#comment-16216522 ] ASF GitHub Bot commented on FLINK-7755: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4858 I scanned through the code quickly and could not find any major issues. Well documented, well tested. +1 to merge this. Please don't fotget to update the docs. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215811#comment-16215811 ] ASF GitHub Bot commented on FLINK-7755: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4858 I'll merge the PR in the next days. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213326#comment-16213326 ] ASF GitHub Bot commented on FLINK-7755: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4858 Thanks for the review @xccui! I've updated the PR. Cheers, Fabian > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213322#comment-16213322 ] ASF GitHub Bot commented on FLINK-7755: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r146076115 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala --- @@ -41,8 +41,7 @@ class DataSetJoinRule val joinInfo = join.analyzeCondition // joins require an equi-condition or a conjunctive predicate with at least one equi-condition -// and disable outer joins with non-equality predicates(see FLINK-5520) -!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) +!joinInfo.pairs().isEmpty --- End diff -- Yes, that is true but the rule are also applied in different contexts. `FlinkLogicalJoin` is used for the initial translation of batch and stream programs and `DataSetJoinRule` only for batch. I think it's OK to have these checks as safety net. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212508#comment-16212508 ] ASF GitHub Bot commented on FLINK-7755: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r145938669 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -156,65 +163,394 @@ class DataSetJoin( val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) -val (joinOperator, nullCheck) = joinType match { - case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false) - case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true) - case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true) - case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true) +joinType match { + case JoinRelType.INNER => +addInnerJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.LEFT => +addLeftOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.RIGHT => +addRightOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.FULL => +addFullOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) } + } -if (nullCheck && !config.getNullCheck) { - throw TableException("Null check in TableConfig must be enabled for outer joins.") -} + private def addInnerJoin( + left: DataSet[Row], + right: DataSet[Row], + leftKeys: Array[Int], + rightKeys: Array[Int], + resultType: TypeInformation[Row], + config: TableConfig): DataSet[Row] = { val generator = new FunctionCodeGenerator( config, - nullCheck, - leftDataSet.getType, - Some(rightDataSet.getType)) + false, + left.getType, + Some(right.getType)) val conversion = generator.generateConverterResultExpression( - returnType, + resultType, joinRowType.getFieldNames) -var body = "" +val condition = generator.generateExpression(joinCondition) +val body = + s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin -if (joinInfo.isEqui) { - // only equality condition - body = s""" - |${conversion.code} - |${generator.collectorTerm}.collect(${conversion.resultTerm}); - |""".stripMargin -} -else { - val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder) - val condition = generator.generateExpression(nonEquiPredicates) - body = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin -} val genFunction = generator.generateFunction( ruleDescription, classOf[FlatJoinFunction[Row, Row, Row]], body, - returnType) + resultType) val joinFun = new FlatJoinRunner[Row, Row, Row]( genFunction.name, genFunction.code, genFunction.returnType) -val joinOpName = - s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " + -s"join: (${joinSelectionToString(joinRowType)})" +left.join(right) + .where(leftKeys: _*) + .equalTo(rightKeys: _*) + .`with`(joinFun) + .name(getJoinOpName) + } + + private def addLeftOuterJoin( + left: DataSet[Row], + right: DataSet[Row], + leftKeys: Array[Int], + rightKeys: Array[Int], + resultType: TypeInformation[Row], + config:
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212511#comment-16212511 ] ASF GitHub Bot commented on FLINK-7755: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r145863086 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala --- @@ -97,8 +97,7 @@ private class FlinkLogicalJoinConverter private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = { --- End diff -- The parameter `join` seems to be useless now. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212510#comment-16212510 ] ASF GitHub Bot commented on FLINK-7755: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r145863708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala --- @@ -41,8 +41,7 @@ class DataSetJoinRule val joinInfo = join.analyzeCondition // joins require an equi-condition or a conjunctive predicate with at least one equi-condition -// and disable outer joins with non-equality predicates(see FLINK-5520) -!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) +!joinInfo.pairs().isEmpty --- End diff -- The condition is checked three times in `FlinkLogicalJoin`, `DataSetJoinRule`, and `DataSetJoin`. It brings extra maintenance work every time we change the validation rule. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212509#comment-16212509 ] ASF GitHub Bot commented on FLINK-7755: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4858#discussion_r145938918 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -156,65 +163,394 @@ class DataSetJoin( val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) -val (joinOperator, nullCheck) = joinType match { - case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false) - case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true) - case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true) - case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true) +joinType match { + case JoinRelType.INNER => +addInnerJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.LEFT => +addLeftOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.RIGHT => +addRightOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) + case JoinRelType.FULL => +addFullOuterJoin( + leftDataSet, + rightDataSet, + leftKeys.toArray, + rightKeys.toArray, + returnType, + config) } + } -if (nullCheck && !config.getNullCheck) { - throw TableException("Null check in TableConfig must be enabled for outer joins.") -} + private def addInnerJoin( + left: DataSet[Row], + right: DataSet[Row], + leftKeys: Array[Int], + rightKeys: Array[Int], + resultType: TypeInformation[Row], + config: TableConfig): DataSet[Row] = { val generator = new FunctionCodeGenerator( config, - nullCheck, - leftDataSet.getType, - Some(rightDataSet.getType)) + false, + left.getType, + Some(right.getType)) val conversion = generator.generateConverterResultExpression( - returnType, + resultType, joinRowType.getFieldNames) -var body = "" +val condition = generator.generateExpression(joinCondition) +val body = + s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin -if (joinInfo.isEqui) { - // only equality condition - body = s""" - |${conversion.code} - |${generator.collectorTerm}.collect(${conversion.resultTerm}); - |""".stripMargin -} -else { - val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder) - val condition = generator.generateExpression(nonEquiPredicates) - body = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin -} val genFunction = generator.generateFunction( ruleDescription, classOf[FlatJoinFunction[Row, Row, Row]], body, - returnType) + resultType) val joinFun = new FlatJoinRunner[Row, Row, Row]( genFunction.name, genFunction.code, genFunction.returnType) -val joinOpName = - s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " + -s"join: (${joinSelectionToString(joinRowType)})" +left.join(right) + .where(leftKeys: _*) + .equalTo(rightKeys: _*) + .`with`(joinFun) + .name(getJoinOpName) + } + + private def addLeftOuterJoin( + left: DataSet[Row], + right: DataSet[Row], + leftKeys: Array[Int], + rightKeys: Array[Int], + resultType: TypeInformation[Row], + config:
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210847#comment-16210847 ] ASF GitHub Bot commented on FLINK-7755: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4858 [FLINK-7755] [table] Fix NULL handling in batch joins. ## What is the purpose of the change This PR fixes a couple of issues with Table API / SQL batch joins: - Proper support for joining null values for inner and outer joins - Support for non-equi join predicates in outer joins (at least one equi-join predicate is required) - Support for local predicates on the outer input of outer joins (at least one equi-join predicate is required) ## Brief change log - Inner & Outer Joins: Evaluate all join predicates in a code-gen'd function (also equi-join predicates) for correct handling of three-value logic - Outer joins: translate outer joins into a sequence of GroupReduce -> OuterJoin -> GroupReduce. - The first GroupReduce groups on the full input row and deduplicates the outer side(s) of the join. A count for the number of deduplicated rows is kept. - The OuterJoin evaluates the join predicate and computes possible join pairs of left and right rows. The non-outer element of the pair can be null if the join predicate does not match. - The second GroupReduce groups again on the full input row and computes for each outer row the join result. If it was not match with any inner row, it produces a null-padded result. - The plan for left and right outer joins requires only a single initial partitioning and sort of each input. The all operators can reuse the initial sort and produce a sorted result again. A full outer join requires an additional partitioning and sorting step. - Checks for outer join translation are removed to allow outer joins with non-equi and local predicates. ## Verifying this change - added ITCases for the new outer join features to `JoinITCase` - added plan tests for Table API and SQL for the new outer join features - updated validation tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** ## Documentation The documentation does not need to be adjusted because the outer join limitation were not documented. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableBatchNullJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4858.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4858 commit 1434d8d7debe207e4b8350199eded4e678885571 Author: Fabian HueskeDate: 2017-10-15T15:55:23Z [FLINK-7755] [table] Fix NULL handling in batch joins. Fixes [FLINK-5498] (Add support for non-equi join and local predicates to outer joins) as well. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209189#comment-16209189 ] Fabian Hueske commented on FLINK-7755: -- I think we should treat this kind of bugs as blockers. If something fails or doesn't work it is bad, but if it returns wrong results without users noticing its much worse, IMO. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209183#comment-16209183 ] Till Rohrmann commented on FLINK-7755: -- But this bug is already present since Flink 1.3, right? I wouldn't really block 1.4 on this issue. Even though I think we should try to push hard to get it in for 1.4. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208992#comment-16208992 ] Fabian Hueske commented on FLINK-7755: -- We compute wrong results. This is qualifies as a blocker, IMO. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
[ https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208982#comment-16208982 ] Till Rohrmann commented on FLINK-7755: -- Is this truly a blocker issue? I would assess it as critical since it might already have been included in 1.3. > Null values are not correctly handled by batch inner and outer joins > > > Key: FLINK-7755 > URL: https://issues.apache.org/jira/browse/FLINK-7755 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Join predicates of batch joins are not correctly evaluated according to > three-value logic. > This affects inner as well as outer joins. > The problem is that some equality predicates are only evaluated by the > internal join algorithms of Flink which are based on {{TypeComparator}}. The > field {{TypeComparator}} for {{Row}} are implemented such that {{null == > null}} results in {{TRUE}} to ensure correct ordering and grouping. However, > three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or > null). The code generator implements this logic correctly, but for equality > predicates, no code is generated. > For outer joins, the problem is a bit tricker because these do not support > code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a > solution for this issue. > We also need to extend several of the existing tests and add null values to > ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)