[GitHub] flink pull request #2661: [FLINK-4283] ExecutionGraphRestartTest fails

2016-10-19 Thread AlexanderShoshin
GitHub user AlexanderShoshin opened a pull request:

https://github.com/apache/flink/pull/2661

[FLINK-4283] ExecutionGraphRestartTest fails

Tests were falling by timeout on less than 3 core CPU machines. There were 
several graph *RestartStrategies* that blocked threads from the 
*ExecutionContext* thread pool by calling '*sleep(Long.MAX_VALUE)*' while 
asynchronous graph restarting. This was a cause for some tests to wait for free 
threads and to terminate by timeout.

I replaced these *RestartStrategies* by a new testing class 
(*InfiniteDelayRestartStrategy*) that has the same functionality - it promises 
to restart execution graph after a very long delay. But it doesn't use new 
threads from the thread pool. So it can't block other tests execution.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderShoshin/flink 
FLINK-4283_infinite_restart_strategy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2661.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2661


commit 92fcba525cec6330ddd14643edb675e27afcbdcc
Author: Alexander Shoshin <alexander_shos...@epam.com>
Date:   2016-10-18T10:21:51Z

[FLINK-4283] Use new InfiniteDelayRestartStrategy instead of 
FixedDelayRestartStrategy to avoid blocking threads




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [Flink-4541] Support for SQL NOT IN operator

2016-11-15 Thread AlexanderShoshin
GitHub user AlexanderShoshin opened a pull request:

https://github.com/apache/flink/pull/2811

[Flink-4541] Support for SQL NOT IN operator

NOT IN was not working with nested queries because of missed DataSet cross 
join rule.

I added DataSetSingleRowCrossRule that converts a cross join into a 
DataSetSingleRowCross only if one of inputs is a global aggregation (so it 
contains only one row). It is enough to make NOT IN work and it won't enable 
queries with classical cross joins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderShoshin/flink FLINK-4541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2811


commit ca24a3a55fb2c55280386eecaaaf4d640350f62a
Author: Alexander Shoshin <alexander_shos...@epam.com>
Date:   2016-11-09T08:27:57Z

Add unit tests for a cross join

commit 193eedd378df87391a41345a0f9a9cb6d5d35232
Author: Alexander Shoshin <alexander_shos...@epam.com>
Date:   2016-11-03T16:24:25Z

[FLINK-4541] Add support for DataSet cross join operation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-25 Thread AlexanderShoshin
Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89628629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[RichMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: RichMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
--- End diff --

Do we really need it?
FlatJoinFunction that we are going to use doesn't have a close() method. So 
if we create a close() method here it will look like this:
{{override def close(): Unit = super.close()}}
And {{super.close()}} is also empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-23 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, Fabian.

I've made all the corrections. One thing I didn't understand - are we going 
to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule?

I also added several tests to verify that CrossJoin works for both left and 
right single row input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
I think, that calling a JoinFunction inside a RichMapRunner make sence. I 
would also prefer not to touch the code generation if it's possible.
But shouldn't we separate the support of all inner joins with a single row 
input from this "NOT IN" pull request? We might create a new jira issue to do 
this in another pull request.
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Sure,
I will do it tomorrow :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-18 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Thanks, Fabian.
It's my mistake that I used a cross function. I will try to make the same 
with a map function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-29 Thread AlexanderShoshin
Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89960391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

It looks like Calcite Validator don't check all the cases at the moment...
But I agree that my `isConditionTypesCompatible` check is very poor and it 
shouldn't be here.
Thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not wor

[GitHub] flink pull request #3009: [FLINK-5255] Improve single row check in DataSetSi...

2016-12-15 Thread AlexanderShoshin
GitHub user AlexanderShoshin opened a pull request:

https://github.com/apache/flink/pull/3009

[FLINK-5255] Improve single row check in DataSetSingleRowJoinRule

DataSetSingleRowJoinRule now supports not only `LogicalAggregate` as single 
row input, but also `LogicalCalc`, `LogicalProject` and `LogicalFilter` 
followed by `LogicalAggregate`.
If `LogicalFilter` returns empty set `DataSetSingleRowJoin` will also 
return empty set.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderShoshin/flink FLINK-5255

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3009.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3009


commit f822d7c0320860a632ff6879f16ffec2c9f14350
Author: Alexander Shoshin <alexander_shos...@epam.com>
Date:   2016-12-14T10:59:08Z

added tests for LogicalCalc support in DataSetSingleRowJoinRule

commit d954455c86532c4362c88d578f26e8bbe8ffb060
Author: Alexander Shoshin <alexander_shos...@epam.com>
Date:   2016-12-14T11:04:49Z

[FLINK-5255] Enable single row LogicalCalc as input in 
DataSetSingleRowJoinRule




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-29 Thread AlexanderShoshin
Github user AlexanderShoshin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r90034073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

`DataSetJoin` has the same keyPairs checking code. And different key types 
(String and Int for example) in `WHERE` expression will be caught by it if we 
have 'a3 = b1'. But we will receive a `NumberFormatException` from the 
generated join function if `WHERE` expression looks like this: `a3 < b1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear o

[GitHub] flink issue #2811: [FLINK-5159] Improve perfomance of inner joins with a sin...

2016-11-30 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, @fhueske.
I've made the corrections that you asked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---