[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-12-01 Thread Alexander Shoshin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714311#comment-15714311
 ] 

Alexander Shoshin commented on FLINK-5159:
--

[~fhueske], thank you a lot for your help with this issue!

> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
> Fix For: 1.2.0
>
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712896#comment-15712896
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2811


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712099#comment-15712099
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2811
  
Merging


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711967#comment-15711967
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2811
  
Thanks for the update @AlexanderShoshin.
PR is good to merge!


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711203#comment-15711203
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, @fhueske.
I've made the corrections that you asked.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705628#comment-15705628
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r90034073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

`DataSetJoin` has the same keyPairs checking code. And different key types 
(String and Int for example) in `WHERE` expression will be caught by it if we 
have 'a3 = b1'. But we will receive a `NumberFormatException` from the 
genera

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704696#comment-15704696
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89962072
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

Oh, it does not? That does not sound right to me. My feeling would be that 
either we are not using it correctly or there is a bug in Calcite. @twalthr, 
what do you think about this?


> Improve perfomance of inner joins with a single row in

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704664#comment-15704664
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89960391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

It looks like Calcite Validator don't check all the cases at the moment...
But I agree that my `isConditionTypesCompatible` check is very poor and it 
shouldn't be here.
Thanks :)


> Improve perfomance of inner joins with a 

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704642#comment-15704642
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89958857
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

Yes, but I think that would be checked by Calcite's SQL Validator before 
the optimization, no?


> Improve perfomance of inner joins with a single row input
> -
>
> Key:

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15704624#comment-15704624
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r8995
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

This will catch a case when we compare different types in `WHERE` 
expression:
```
SELECT *
FROM A, (SELECT min(b1) AS b1 FROM B)
WHERE a3 = b1
```
Should this `SELECT` throw an exception if a3 is a `String` a

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703157#comment-15703157
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT],
+broadcastSetName: String)
+  extends RichFlatMapFunction[MULTI_IN, OUT]
+with ResultTypeQueryable[OUT]
+with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  protected var function: FlatJoinFunction[IN1, IN2, OUT] = null
--- End diff --

`= null` -> `= _`


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703162#comment-15703162
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883549
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

This check can be removed.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/bro

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703161#comment-15703161
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
--- End diff --

We can move the logic of `createPlan` here.


> Improve perfomance of in

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703159#comment-15703159
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884314
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
--- End diff --

Please remove import.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703158#comment-15703158
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
+
+class DataSetSingleRowJoinRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isInnerJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isInnerJoin(join: LogicalJoin) = {
+join.getJoinType == JoinRelType.INNER
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets == null ||
+  (agg.getGroupSets.size() == 1 &&
+   agg.getGroupSets.get(0).isEmpty &&
+   agg.getGroupSet.isEmpty)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join = rel.asInstanceOf[LogicalJoin]
+val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val dataSetLeftNode = RelOptRule.convert(join.getLeft, 
DataSetConvention.INSTANCE)
+val dataSetRightNode = RelOptRule.convert(join.getRight, 
DataSetConvention.INSTANCE)
+val leftIsSingle = 
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+new DataSetSingleRowJoin(
+  rel.getCluster,
+  traitSet,
+  dataSetLeftNode,
+  dataSetRightNode,
+  leftIsSingle,
+  rel.getRowType,
+  join.getCondition,
+  join.getRowType,
+  join.analyzeCondition.pairs.toList,
--- End diff --

this parameter can be removed. `joinCondition` includes the complete join 
predicate that we need to evaluate.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703155#comment-15703155
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883453
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
--- End diff --

We can access left and right input more easily with `this.getLeft()` and 
`this.getRight()`. No need to use `foldLeft` to aggregate the stats of the left 
and right input.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703160#comment-15703160
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89811862
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
 ---
@@ -41,3 +41,5 @@ object GeneratedExpression {
 }
 
 case class GeneratedFunction[T](name: String, returnType: 
TypeInformation[Any], code: String)
+
+case class GeneratedField(fieldName: String, fieldType: String)
--- End diff --

This is not used anymore, right?
I think we should remove it.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703156#comment-15703156
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89882087
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
 ---
@@ -32,6 +32,7 @@ import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.{Row, TableEnvironment, 
ValidationException}
 import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
--- End diff --

Remove import


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703154#comment-15703154
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884157
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
+} else {
+  throw TableException(
+"Join predicate on incompatible types

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703163#comment-15703163
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
--- End diff --

We do not need the `keyPairs` parameter. These are included in the 
`joinCondition` and have been validated before.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696167#comment-15696167
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89635696
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[RichMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: RichMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
--- End diff --

You are right. When I wrote the comment, I had not thought about wrapping a 
`FlatJoinFunction` yet but was still assuming a wrapped `RichFlatMapFunction` 
which has `open()`.

So we do not need to forward `close()` and `open()` to the 
`FlatJoinFunction`.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696048#comment-15696048
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89628629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[RichMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: RichMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
--- End diff --

Do we really need it?
FlatJoinFunction that we are going to use doesn't have a close() method. So 
if we create a close() method here it will look like this:
{{override def close(): Unit = super.close()}}
And {{super.close()}} is also empty.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)