[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714311#comment-15714311 ] Alexander Shoshin commented on FLINK-5159: -- [~fhueske], thank you a lot for your help with this issue! > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > Fix For: 1.2.0 > > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712896#comment-15712896 ] ASF GitHub Bot commented on FLINK-5159: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2811 > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712099#comment-15712099 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2811 Merging > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711967#comment-15711967 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2811 Thanks for the update @AlexanderShoshin. PR is good to merge! > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711203#comment-15711203 ] ASF GitHub Bot commented on FLINK-5159: --- Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Hi, @fhueske. I've made the corrections that you asked. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705628#comment-15705628 ] ASF GitHub Bot commented on FLINK-5159: --- Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r90034073 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- `DataSetJoin` has the same keyPairs checking code. And different key types (String and Int for example) in `WHERE` expression will be caught by it if we have 'a3 = b1'. But we will receive a `NumberFormatException` from the genera
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704696#comment-15704696 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89962072 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- Oh, it does not? That does not sound right to me. My feeling would be that either we are not using it correctly or there is a bug in Calcite. @twalthr, what do you think about this? > Improve perfomance of inner joins with a single row in
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704664#comment-15704664 ] ASF GitHub Bot commented on FLINK-5159: --- Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89960391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- It looks like Calcite Validator don't check all the cases at the moment... But I agree that my `isConditionTypesCompatible` check is very poor and it shouldn't be here. Thanks :) > Improve perfomance of inner joins with a
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704642#comment-15704642 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89958857 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- Yes, but I think that would be checked by Calcite's SQL Validator before the optimization, no? > Improve perfomance of inner joins with a single row input > - > > Key:
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704624#comment-15704624 ] ASF GitHub Bot commented on FLINK-5159: --- Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r8995 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- This will catch a case when we compare different types in `WHERE` expression: ``` SELECT * FROM A, (SELECT min(b1) AS b1 FROM B) WHERE a3 = b1 ``` Should this `SELECT` throw an exception if a3 is a `String` a
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703157#comment-15703157 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89884981 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT], +broadcastSetName: String) + extends RichFlatMapFunction[MULTI_IN, OUT] +with ResultTypeQueryable[OUT] +with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + protected var function: FlatJoinFunction[IN1, IN2, OUT] = null --- End diff -- `= null` -> `= _` > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703162#comment-15703162 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89883549 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- This check can be removed. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/bro
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703161#comment-15703161 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89883678 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, + right.getRowType.getFieldList, + keyPairs)) { + createPlan(tableEnv, expectedType) --- End diff -- We can move the logic of `createPlan` here. > Improve perfomance of in
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703159#comment-15703159 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89884314 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin} + +import scala.collection.JavaConversions._ --- End diff -- Please remove import. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703158#comment-15703158 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89884351 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin} + +import scala.collection.JavaConversions._ + +class DataSetSingleRowJoinRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isInnerJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isInnerJoin(join: LogicalJoin) = { +join.getJoinType == JoinRelType.INNER + } + + private def isGlobalAggregation(node: RelNode) = { +node.isInstanceOf[LogicalAggregate] && + isSingleLine(node.asInstanceOf[LogicalAggregate]) + } + + private def isSingleLine(agg: LogicalAggregate) = { +agg.getGroupSets == null || + (agg.getGroupSets.size() == 1 && + agg.getGroupSets.get(0).isEmpty && + agg.getGroupSet.isEmpty) + } + + override def convert(rel: RelNode): RelNode = { +val join = rel.asInstanceOf[LogicalJoin] +val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) +val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE) +val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE) +val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) + +new DataSetSingleRowJoin( + rel.getCluster, + traitSet, + dataSetLeftNode, + dataSetRightNode, + leftIsSingle, + rel.getRowType, + join.getCondition, + join.getRowType, + join.analyzeCondition.pairs.toList, --- End diff -- this parameter can be removed. `joinCondition` includes the complete join predicate that we need to evaluate. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703155#comment-15703155 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89883453 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || --- End diff -- We can access left and right input more easily with `this.getLeft()` and `this.getRight()`. No need to use `foldLeft` to aggregate the stats of the left and right input. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}.
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703160#comment-15703160 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89811862 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala --- @@ -41,3 +41,5 @@ object GeneratedExpression { } case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String) + +case class GeneratedField(fieldName: String, fieldType: String) --- End diff -- This is not used anymore, right? I think we should remove it. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703156#comment-15703156 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89882087 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala --- @@ -32,6 +32,7 @@ import org.apache.flink.api.table.functions.ScalarFunction import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.junit.Assert._ --- End diff -- Remove import > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703154#comment-15703154 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89884157 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, + right.getRowType.getFieldList, + keyPairs)) { + createPlan(tableEnv, expectedType) +} else { + throw TableException( +"Join predicate on incompatible types
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703163#comment-15703163 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89883134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], --- End diff -- We do not need the `keyPairs` parameter. These are included in the `joinCondition` and have been validated before. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696167#comment-15696167 ] ASF GitHub Bot commented on FLINK-5159: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89635696 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[RichMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: RichMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { --- End diff -- You are right. When I wrote the comment, I had not thought about wrapping a `FlatJoinFunction` yet but was still assuming a wrapped `RichFlatMapFunction` which has `open()`. So we do not need to forward `close()` and `open()` to the `FlatJoinFunction`. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input
[ https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696048#comment-15696048 ] ASF GitHub Bot commented on FLINK-5159: --- Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89628629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[RichMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: RichMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { --- End diff -- Do we really need it? FlatJoinFunction that we are going to use doesn't have a close() method. So if we create a close() method here it will look like this: {{override def close(): Unit = super.close()}} And {{super.close()}} is also empty. > Improve perfomance of inner joins with a single row input > - > > Key: FLINK-5159 > URL: https://issues.apache.org/jira/browse/FLINK-5159 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Alexander Shoshin >Assignee: Alexander Shoshin >Priority: Minor > > All inner joins (including a cross join) can be implemented as a > {{MapFunction}} if one of their inputs is a single row. This row can be > passed to a {{MapFunction}} as a {{BroadcastSet}}. > This approach is going to be more lightweight than the other current > strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)