[GitHub] flink pull request #2661: [FLINK-4283] ExecutionGraphRestartTest fails
GitHub user AlexanderShoshin opened a pull request: https://github.com/apache/flink/pull/2661 [FLINK-4283] ExecutionGraphRestartTest fails Tests were falling by timeout on less than 3 core CPU machines. There were several graph *RestartStrategies* that blocked threads from the *ExecutionContext* thread pool by calling '*sleep(Long.MAX_VALUE)*' while asynchronous graph restarting. This was a cause for some tests to wait for free threads and to terminate by timeout. I replaced these *RestartStrategies* by a new testing class (*InfiniteDelayRestartStrategy*) that has the same functionality - it promises to restart execution graph after a very long delay. But it doesn't use new threads from the thread pool. So it can't block other tests execution. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AlexanderShoshin/flink FLINK-4283_infinite_restart_strategy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2661.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2661 commit 92fcba525cec6330ddd14643edb675e27afcbdcc Author: Alexander Shoshin <alexander_shos...@epam.com> Date: 2016-10-18T10:21:51Z [FLINK-4283] Use new InfiniteDelayRestartStrategy instead of FixedDelayRestartStrategy to avoid blocking threads --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [Flink-4541] Support for SQL NOT IN operator
GitHub user AlexanderShoshin opened a pull request: https://github.com/apache/flink/pull/2811 [Flink-4541] Support for SQL NOT IN operator NOT IN was not working with nested queries because of missed DataSet cross join rule. I added DataSetSingleRowCrossRule that converts a cross join into a DataSetSingleRowCross only if one of inputs is a global aggregation (so it contains only one row). It is enough to make NOT IN work and it won't enable queries with classical cross joins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AlexanderShoshin/flink FLINK-4541 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2811.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2811 commit ca24a3a55fb2c55280386eecaaaf4d640350f62a Author: Alexander Shoshin <alexander_shos...@epam.com> Date: 2016-11-09T08:27:57Z Add unit tests for a cross join commit 193eedd378df87391a41345a0f9a9cb6d5d35232 Author: Alexander Shoshin <alexander_shos...@epam.com> Date: 2016-11-03T16:24:25Z [FLINK-4541] Add support for DataSet cross join operation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...
Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89628629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[RichMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: RichMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { --- End diff -- Do we really need it? FlatJoinFunction that we are going to use doesn't have a close() method. So if we create a close() method here it will look like this: {{override def close(): Unit = super.close()}} And {{super.close()}} is also empty. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Hi, Fabian. I've made all the corrections. One thing I didn't understand - are we going to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule? I also added several tests to verify that CrossJoin works for both left and right single row input. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 I think, that calling a JoinFunction inside a RichMapRunner make sence. I would also prefer not to touch the code generation if it's possible. But shouldn't we separate the support of all inner joins with a single row input from this "NOT IN" pull request? We might create a new jira issue to do this in another pull request. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Sure, I will do it tomorrow :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Thanks, Fabian. It's my mistake that I used a cross function. I will try to make the same with a map function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...
Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89960391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- It looks like Calcite Validator don't check all the cases at the moment... But I agree that my `isConditionTypesCompatible` check is very poor and it shouldn't be here. Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not wor
[GitHub] flink pull request #3009: [FLINK-5255] Improve single row check in DataSetSi...
GitHub user AlexanderShoshin opened a pull request: https://github.com/apache/flink/pull/3009 [FLINK-5255] Improve single row check in DataSetSingleRowJoinRule DataSetSingleRowJoinRule now supports not only `LogicalAggregate` as single row input, but also `LogicalCalc`, `LogicalProject` and `LogicalFilter` followed by `LogicalAggregate`. If `LogicalFilter` returns empty set `DataSetSingleRowJoin` will also return empty set. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AlexanderShoshin/flink FLINK-5255 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3009.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3009 commit f822d7c0320860a632ff6879f16ffec2c9f14350 Author: Alexander Shoshin <alexander_shos...@epam.com> Date: 2016-12-14T10:59:08Z added tests for LogicalCalc support in DataSetSingleRowJoinRule commit d954455c86532c4362c88d578f26e8bbe8ffb060 Author: Alexander Shoshin <alexander_shos...@epam.com> Date: 2016-12-14T11:04:49Z [FLINK-5255] Enable single row LogicalCalc as input in DataSetSingleRowJoinRule --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...
Github user AlexanderShoshin commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r90034073 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode that executes a Join where one of inputs is a single row. + */ +class DataSetSingleRowJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinCondition: RexNode, +joinRowType: RelDataType, +keyPairs: List[IntPair], +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinCondition, + joinRowType, + keyPairs, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + if (leftIsSingle && child.equals(right) || + !leftIsSingle && child.equals(left)) { +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } else { +cost + } +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +if (isConditionTypesCompatible(left.getRowType.getFieldList, --- End diff -- `DataSetJoin` has the same keyPairs checking code. And different key types (String and Int for example) in `WHERE` expression will be caught by it if we have 'a3 = b1'. But we will receive a `NumberFormatException` from the generated join function if `WHERE` expression looks like this: `a3 < b1`. --- If your project is set up for it, you can reply to this email and have your reply appear o
[GitHub] flink issue #2811: [FLINK-5159] Improve perfomance of inner joins with a sin...
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Hi, @fhueske. I've made the corrections that you asked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---