[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972040#comment-15972040
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Hi @fhueske , sorry for taking it so long to update this PR. 
Looks like it has so many conflicts with master now, once you are ok with 
all the changes, i will rebase it to master.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Hi @fhueske , sorry for taking it so long to update this PR. 
Looks like it has so many conflicts with master now, once you are ok with 
all the changes, i will rebase it to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972034#comment-15972034
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861591
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

I will remove this simple comment to be consistent will other convert rules.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861591
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

I will remove this simple comment to be consistent will other convert rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972028#comment-15972028
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

I'm ok with both, will change to `Calc`.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

I'm ok with both, will change to `Calc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972024#comment-15972024
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860657
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i 
understand correctly, `UNION` will do a global distinct for all fields and 
`UNION ALL` just concat two datasets or datastreams. I think the behavior of 
`DataStream.union` is rather `UNION ALL` than `UNION`.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860657
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i 
understand correctly, `UNION` will do a global distinct for all fields and 
`UNION ALL` just concat two datasets or datastreams. I think the behavior of 
`DataStream.union` is rather `UNION ALL` than `UNION`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972018#comment-15972018
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

We need to rewrite `explainTerms` and `toString` to make the RelNode's 
digest distinguished from other `TableSourceScan`. The default implementation 
only compare the table name and it's not enough when we push filter or projects 
into the `TableSource`. 

Actually there is a bug in `toString` and fixed in 
https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a,
 i will backport it in here.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

We need to rewrite `explainTerms` and `toString` to make the RelNode's 
digest distinguished from other `TableSourceScan`. The default implementation 
only compare the table name and it's not enough when we push filter or projects 
into the `TableSource`. 

Actually there is a bug in `toString` and fixed in 
https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a,
 i will backport it in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972015#comment-15972015
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111859756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

yes, these can be removed


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111859756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

yes, these can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111858923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

This is similar with `FlinkLogicalJoin`, we need other logical rules to 
rewrite distinct aggregates first and then convert it to a "clean" 
FlinkLogicalAggregate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3730: [FLINK-5943] [YARN] Fix unprotected access to haSe...

2017-04-17 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3730

[FLINK-5943] [YARN] Fix unprotected access to haServices in YarnFlink…

…ApplicationMasterRunner#shutdown().

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-5943

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3730


commit f31052dbe8a7b8150504757c15b3e731cc6f87c2
Author: zhangminglei 
Date:   2017-04-18T02:00:09Z

[FLINK-5943] [YARN] Fix unprotected access to haServices in 
YarnFlinkApplicationMasterRunner#shutdown().




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971996#comment-15971996
 ] 

ASF GitHub Bot commented on FLINK-5943:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3730

[FLINK-5943] [YARN] Fix unprotected access to haServices in YarnFlink…

…ApplicationMasterRunner#shutdown().

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-5943

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3730


commit f31052dbe8a7b8150504757c15b3e731cc6f87c2
Author: zhangminglei 
Date:   2017-04-18T02:00:09Z

[FLINK-5943] [YARN] Fix unprotected access to haServices in 
YarnFlinkApplicationMasterRunner#shutdown().




> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971990#comment-15971990
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

How about rename `org.apache.flink.table.plan.node` to 
`org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` 
and `datastream`?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

How about rename `org.apache.flink.table.plan.node` to 
`org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` 
and `datastream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971988#comment-15971988
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Currently the answer is yes, because the physical join can only support 
equation condition with simple keys. 
If the join key is express like `a + 1 = b - 2`, if we don't have this 
restriction in logical layer, the join condition will be like this and we can't 
translate to physical join. There are 2 possible solutions:
1. keep it this way
2. Re-introduce rules which can exact expression out of join condition and 
add addition "Calc" node to keep join key as simple field. 
I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to 
let logical layer know all these restrictions. What do you think?


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Currently the answer is yes, because the physical join can only support 
equation condition with simple keys. 
If the join key is express like `a + 1 = b - 2`, if we don't have this 
restriction in logical layer, the join condition will be like this and we can't 
translate to physical join. There are 2 possible solutions:
1. keep it this way
2. Re-introduce rules which can exact expression out of join condition and 
add addition "Calc" node to keep join key as simple field. 
I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to 
let logical layer know all these restrictions. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5550) NotFoundException: Could not find job with id

2017-04-17 Thread jiwengang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971984#comment-15971984
 ] 

jiwengang commented on FLINK-5550:
--

yep

> NotFoundException: Could not find job with id
> -
>
> Key: FLINK-5550
> URL: https://issues.apache.org/jira/browse/FLINK-5550
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.0.3
> Environment: centos
>Reporter: jiwengang
>Priority: Minor
>  Labels: newbie
>
> Job is canceled, but still report the following exception:
> 2017-01-18 10:35:18,677 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
> handling request
> org.apache.flink.runtime.webmonitor.NotFoundException: Could not find job 
> with id 3b98e734c868cc2b992743cfe8911ad0
> at 
> org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleRequest(AbstractExecutionGraphRequestHandler.java:58)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:104)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971969#comment-15971969
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111856357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Yes, i will first test how HepPlanner works in some test and product 
environments and then decide whether or how we change it to rule-based planner. 
So this will be my follow up task.


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111856357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Yes, i will first test how HepPlanner works in some test and product 
environments and then decide whether or how we change it to rule-based planner. 
So this will be my follow up task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971966#comment-15971966
 ] 

mingleizhang commented on FLINK-5943:
-

[~tedyu] Thanks and appreciate it. I will give a PR soon enough.

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-6090.
--
Resolution: Fixed

> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971958#comment-15971958
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user hequn8128 closed the pull request at:

https://github.com/apache/flink/pull/3696


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

2017-04-17 Thread hequn8128
Github user hequn8128 closed the pull request at:

https://github.com/apache/flink/pull/3696


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6313) Some words was spelled wrong and incorrect LOG.error without print

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971944#comment-15971944
 ] 

ASF GitHub Bot commented on FLINK-6313:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3728
  
@tzulitai Hi, Could you please review those code ? Thanks and appreciate ~


> Some words was spelled wrong and incorrect LOG.error without print
> --
>
> Key: FLINK-6313
> URL: https://issues.apache.org/jira/browse/FLINK-6313
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Trivial
>
> I find some words are spelled wrong and log.error without print information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3728: [FLINK-6313] [flink-runtime] Fix Some words was spelled w...

2017-04-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3728
  
@tzulitai Hi, Could you please review those code ? Thanks and appreciate ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6286) hbase command not found error

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971920#comment-15971920
 ] 

ASF GitHub Bot commented on FLINK-6286:
---

Github user lingjinjiang commented on the issue:

https://github.com/apache/flink/pull/3711
  
@greghogan thanks for your review. I have another question, how can I 
assign the issue to myself ? 


> hbase command not found error
> -
>
> Key: FLINK-6286
> URL: https://issues.apache.org/jira/browse/FLINK-6286
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Startup Shell 
> Scripts
>Reporter: Jinjiang Ling
>Priority: Minor
> Attachments: FLINK-6286-0.patch, FLINK-6286-1.patch, 
> FLINK-6286-2.patch, FLINK-6286-3.patch
>
>
> As I'm using flink with the HBASE_CONF_DIR env variable and don't install 
> hbase, then I get the error message below.
> {quote}
> *bin/config.sh: line 303: hbase: command not found*
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3711: [FLINK-6286] [script] Fix the hbase command not found err...

2017-04-17 Thread lingjinjiang
Github user lingjinjiang commented on the issue:

https://github.com/apache/flink/pull/3711
  
@greghogan thanks for your review. I have another question, how can I 
assign the issue to myself ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6316) Kinesis connector docs should mention that FlinkKinesisConsumer does not provide strong ordering guarantees on resharding

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6316:
--

 Summary: Kinesis connector docs should mention that 
FlinkKinesisConsumer does not provide strong ordering guarantees on resharding
 Key: FLINK-6316
 URL: https://issues.apache.org/jira/browse/FLINK-6316
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai


Since the {{FlinkKinesisConsumer}} depends only on local information to 
determine whether or not a new shard due to resharding should be subscribed by 
a subtask, there is no coordination wrt parent-child shard relationship across 
subtasks.

Therefore, {{FlinkKinesisConsumer}} does not provide any strong processing 
ordering guarantees.

Take for example, if initially the assignment is:
Subtask #1 --> Shard A
Subtask #2 --> Shard B

Assume A & B is merged to create shard C, and subtask #1 locally determines 
that it should be assigned shard C.

Since Flink generally does not provide coordinating facilities between subtask 
instances, there is no means of coordinating that shard C is consumed only 
after shard B is also fully consumed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6240.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with 9be5cc42c02c258d3843c373b7350240c9570523

> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971754#comment-15971754
 ] 

ASF GitHub Bot commented on FLINK-6240:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3694


> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...

2017-04-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3694


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6292) Travis: transfer.sh not accepting uploads via http:// anymore

2017-04-17 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971637#comment-15971637
 ] 

Nico Kruber commented on FLINK-6292:


does it make sense to cherry-pick this commit into the previous versions' 
branches as well?

> Travis: transfer.sh not accepting uploads via http:// anymore
> -
>
> Key: FLINK-6292
> URL: https://issues.apache.org/jira/browse/FLINK-6292
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.2.0, 1.3.0, 1.1.5
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.3.0
>
>
> The {{travis_mvn_watchdog.sh}} script tries to upload the logs to transfer.sh 
> but it seems like they do not accept uploads to {{http://transfer.sh}} 
> anymore and only accept {{https}} nowadays.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971632#comment-15971632
 ] 

ASF GitHub Bot commented on FLINK-5090:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3348
  
yes, that's exactly why


> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3348: [FLINK-5090] [network] Add metrics for details about inbo...

2017-04-17 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3348
  
yes, that's exactly why


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971623#comment-15971623
 ] 

ASF GitHub Bot commented on FLINK-6240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111814395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -396,13 +396,68 @@ class CodeGenerator(
  |  }""".stripMargin
 }
 
-def generateCreateOutputRow(outputArity: Int): String = {
+def genCreateOutputRow(outputArity: Int): String = {
   j"""
  |  public org.apache.flink.types.Row createOutputRow() {
  |return new org.apache.flink.types.Row($outputArity);
  |  }""".stripMargin
 }
 
+def genMergeAccumulatorsPair(
+accTypes: Array[String],
+aggs: Array[String]): String = {
+
+  val sig: String =
+j"""
+   |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+   |org.apache.flink.types.Row a,
+   |org.apache.flink.types.Row b)
+   """.stripMargin
+  val merge: String = {
+for (i <- aggs.indices) yield
+  j"""
+ |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
+ |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
+ |accList$i.set(0, aAcc$i);
+ |accList$i.set(1, bAcc$i);
+ |a.setField(
+ |  $i,
+ |  ${aggs(i)}.merge(accList$i));
+ """.stripMargin
+  }.mkString("\n")
+  val ret: String =
+j"""
+   |  return a;
+   """.stripMargin
+
+  j"""$sig {
+ |$merge
+ |$ret
+ |  }""".stripMargin
+}
+
+def genMergeList(accTypes: Array[String]): String = {
+  {
+for (i <- accTypes.indices) yield
+  j"""
+ |java.util.ArrayList<${accTypes(i)}> accList$i;
+ """.stripMargin
+  }.mkString("\n")
+}
+
+def initMergeList(
+accTypes: Array[String],
+aggs: Array[String]): String = {
+  {
+for (i <- accTypes.indices) yield
+  j"""
+ |accList$i = new java.util.ArrayList<${accTypes(i)}>();
--- End diff --

create with initial capacity 2 `new 
java.util.ArrayList<${accTypes(i)}>(2);`?


> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971624#comment-15971624
 ] 

ASF GitHub Bot commented on FLINK-6240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111816123
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -792,6 +792,20 @@ object AggregateUtil {
 inputType,
 needRetraction = false)
 
+val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, 
x)).toArray
--- End diff --

This field is no longer used and can be removed


> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...

2017-04-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111814395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -396,13 +396,68 @@ class CodeGenerator(
  |  }""".stripMargin
 }
 
-def generateCreateOutputRow(outputArity: Int): String = {
+def genCreateOutputRow(outputArity: Int): String = {
   j"""
  |  public org.apache.flink.types.Row createOutputRow() {
  |return new org.apache.flink.types.Row($outputArity);
  |  }""".stripMargin
 }
 
+def genMergeAccumulatorsPair(
+accTypes: Array[String],
+aggs: Array[String]): String = {
+
+  val sig: String =
+j"""
+   |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+   |org.apache.flink.types.Row a,
+   |org.apache.flink.types.Row b)
+   """.stripMargin
+  val merge: String = {
+for (i <- aggs.indices) yield
+  j"""
+ |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
+ |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
+ |accList$i.set(0, aAcc$i);
+ |accList$i.set(1, bAcc$i);
+ |a.setField(
+ |  $i,
+ |  ${aggs(i)}.merge(accList$i));
+ """.stripMargin
+  }.mkString("\n")
+  val ret: String =
+j"""
+   |  return a;
+   """.stripMargin
+
+  j"""$sig {
+ |$merge
+ |$ret
+ |  }""".stripMargin
+}
+
+def genMergeList(accTypes: Array[String]): String = {
+  {
+for (i <- accTypes.indices) yield
+  j"""
+ |java.util.ArrayList<${accTypes(i)}> accList$i;
+ """.stripMargin
+  }.mkString("\n")
+}
+
+def initMergeList(
+accTypes: Array[String],
+aggs: Array[String]): String = {
+  {
+for (i <- accTypes.indices) yield
+  j"""
+ |accList$i = new java.util.ArrayList<${accTypes(i)}>();
--- End diff --

create with initial capacity 2 `new 
java.util.ArrayList<${accTypes(i)}>(2);`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...

2017-04-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111816123
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -792,6 +792,20 @@ object AggregateUtil {
 inputType,
 needRetraction = false)
 
+val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, 
x)).toArray
--- End diff --

This field is no longer used and can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971601#comment-15971601
 ] 

ASF GitHub Bot commented on FLINK-6240:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111813245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -792,6 +792,20 @@ object AggregateUtil {
 inputType,
 needRetraction = false)
 
+val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, 
x)).toArray
--- End diff --

Thanks @fhueske , I have addressed your comments, please take a look.


> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971602#comment-15971602
 ] 

ASF GitHub Bot commented on FLINK-6240:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3694
  
Thanks @fhueske , I have addressed your comments, please take a look.


> codeGen dataStream aggregates that use AggregateAggFunction
> ---
>
> Key: FLINK-6240
> URL: https://issues.apache.org/jira/browse/FLINK-6240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3694: [FLINK-6240] [table] codeGen dataStream aggregates that u...

2017-04-17 Thread shaoxuan-wang
Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3694
  
Thanks @fhueske , I have addressed your comments, please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...

2017-04-17 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3694#discussion_r111813245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -792,6 +792,20 @@ object AggregateUtil {
 inputType,
 needRetraction = false)
 
+val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, 
x)).toArray
--- End diff --

Thanks @fhueske , I have addressed your comments, please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971577#comment-15971577
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3696
  
Hi @hequn8128 and @shaoxuan-wang, I merged this PR to the 
`table-retraction` branch.
Could you close it? Thanks!


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3696: [FLINK-6090] [table] Add RetractionRule at the stage of d...

2017-04-17 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3696
  
Hi @hequn8128 and @shaoxuan-wang, I merged this PR to the 
`table-retraction` branch.
Could you close it? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971543#comment-15971543
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111804596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-inputNode: RelNode,
--- End diff --

Ah, OK. Thanks for the explanation!
That makes sense :-)


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

2017-04-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111804596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-inputNode: RelNode,
--- End diff --

Ah, OK. Thanks for the explanation!
That makes sense :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971520#comment-15971520
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, the feature was contributed in a follow up PR. 
Could you close this PR? Thanks!


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-04-17 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, the feature was contributed in a follow up PR. 
Could you close this PR? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4831) Implement a log4j metric reporter

2017-04-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971428#comment-15971428
 ] 

Chesnay Schepler commented on FLINK-4831:
-

I have some work-in-progress here: https://github.com/zentol/log4jreporter

Feel free re-use whatever you can.

> Implement a log4j metric reporter
> -
>
> Key: FLINK-4831
> URL: https://issues.apache.org/jira/browse/FLINK-4831
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: easyfix, starter
>
> For debugging purpose it would be very useful to have a log4j metric 
> reporter. If you don't want to setup a metric backend you currently have to 
> rely on JMX, which a) works a bit differently than other reporters (for 
> example it doesn't extend AbstractReporter) and b) makes it a bit tricky to 
> analyze results as metrics are cleaned up once a job finishes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971404#comment-15971404
 ] 

ASF GitHub Bot commented on FLINK-6315:
---

GitHub user sjwiesman opened a pull request:

https://github.com/apache/flink/pull/3729

[FLINK-6315] Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315

A common use case when writing a custom operator that outputs data to some 
third party location to partially output on checkpoint and then commit on 
notifyCheckpointComplete. If that external system does not gracefully handle 
rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
that data needs to be handled by the next checkpoint.
The idea is to add a new interface similar to CheckpointListener that 
provides a callback when the CheckpointCoordinator times out a checkpoint

This is required for the eventually consistent sink coming in FLINK-6306 to 
be able to differentiate between concurrent checkpoints and timed out 
checkpoints. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sjwiesman/flink FLINK-6315

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3729


commit 323851929772b4c57e65b7146e96af6687d3a2eb
Author: Seth Wiesman 
Date:   2017-04-15T21:13:20Z

FLINK-6315 Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315




> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3729: [FLINK-6315] Notify on checkpoint timeout

2017-04-17 Thread sjwiesman
GitHub user sjwiesman opened a pull request:

https://github.com/apache/flink/pull/3729

[FLINK-6315] Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315

A common use case when writing a custom operator that outputs data to some 
third party location to partially output on checkpoint and then commit on 
notifyCheckpointComplete. If that external system does not gracefully handle 
rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
that data needs to be handled by the next checkpoint.
The idea is to add a new interface similar to CheckpointListener that 
provides a callback when the CheckpointCoordinator times out a checkpoint

This is required for the eventually consistent sink coming in FLINK-6306 to 
be able to differentiate between concurrent checkpoints and timed out 
checkpoints. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sjwiesman/flink FLINK-6315

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3729


commit 323851929772b4c57e65b7146e96af6687d3a2eb
Author: Seth Wiesman 
Date:   2017-04-15T21:13:20Z

FLINK-6315 Notify on checkpoint timeout

https://issues.apache.org/jira/browse/FLINK-6315




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4831) Implement a log4j metric reporter

2017-04-17 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971400#comment-15971400
 ] 

Neelesh Srinivas Salian commented on FLINK-4831:


Shall I work on this if no one has begun already?

> Implement a log4j metric reporter
> -
>
> Key: FLINK-4831
> URL: https://issues.apache.org/jira/browse/FLINK-4831
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: easyfix, starter
>
> For debugging purpose it would be very useful to have a log4j metric 
> reporter. If you don't want to setup a metric backend you currently have to 
> rely on JMX, which a) works a bit differently than other reporters (for 
> example it doesn't extend AbstractReporter) and b) makes it a bit tricky to 
> analyze results as metrics are cleaned up once a job finishes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2017-04-17 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971397#comment-15971397
 ] 

Seth Wiesman commented on FLINK-6306:
-

The idea is that data is buffered on local disk until checkpoint when a single 
put operation is performed to copy the data. You can think of it as bucketing 
by checkpoint. The key is that because bad data from failed checkpoints cannot 
be deleted, buckets containing files from successful checkpoints will get an 
empty flag file signifying them consistent. It is then up to consuming 
processes to only read from buckets with flag files with they require exactly 
once semantics, otherwise they can read ever file and get at least once. 
Attached is a poorly drawn diagram explaining how this works. 

I am actually already running this in production right now to write out to S3 
but it first requires FLINK-6315 so that I can differentiate between concurrent 
checkpoints and timed out checkpoints. Otherwise, I would only be able to 
guarantee exactly once is maxConcurrentCheckpoints was set to 1. 

> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6306) Sink for eventually consistent file systems

2017-04-17 Thread Seth Wiesman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-6306:

Attachment: eventually-consistent-sink

> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-04-17 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971390#comment-15971390
 ] 

Neelesh Srinivas Salian commented on FLINK-5506:


Hi, [~mcoimbra] is this still an issue?

> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4
>Reporter: Miguel E. Coimbra
>  Labels: easyfix, newbie
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> #id1 id2 score
> 010
> 020
> 030
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> ---
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> ​---​
> ​Running this code throws the following exception​ (check the bold line):
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
> ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line 
> in bold:
> org.apache.flink.graph.library.CommunityDetection.java (source code 

[jira] [Commented] (FLINK-5550) NotFoundException: Could not find job with id

2017-04-17 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971389#comment-15971389
 ] 

Neelesh Srinivas Salian commented on FLINK-5550:


Hi [~jeffreyji666], is this still a issue?

> NotFoundException: Could not find job with id
> -
>
> Key: FLINK-5550
> URL: https://issues.apache.org/jira/browse/FLINK-5550
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.0.3
> Environment: centos
>Reporter: jiwengang
>Priority: Minor
>  Labels: newbie
>
> Job is canceled, but still report the following exception:
> 2017-01-18 10:35:18,677 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
> handling request
> org.apache.flink.runtime.webmonitor.NotFoundException: Could not find job 
> with id 3b98e734c868cc2b992743cfe8911ad0
> at 
> org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleRequest(AbstractExecutionGraphRequestHandler.java:58)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:104)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6315) Notify on checkpoint timeout

2017-04-17 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-6315:
---

 Summary: Notify on checkpoint timeout 
 Key: FLINK-6315
 URL: https://issues.apache.org/jira/browse/FLINK-6315
 Project: Flink
  Issue Type: New Feature
  Components: Core
Reporter: Seth Wiesman
Assignee: Seth Wiesman


A common use case when writing a custom operator that outputs data to some 
third party location to partially output on checkpoint and then commit on 
notifyCheckpointComplete. If that external system does not gracefully handle 
rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
that data needs to be handled by the next checkpoint. 

The idea is to add a new interface similar to CheckpointListener that provides 
a callback when the CheckpointCoordinator timesout a checkpoint

{code:java}
/**
 * This interface must be implemented by functions/operations that want to 
receive
 * a notification if a checkpoint has been {@link 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
 */
public interface CheckpointTimeoutListener {

/**
 * This method is called as a notification if a distributed checkpoint 
has been timed out.
 *
 * @param checkpointId The ID of the checkpoint that has been timed out.
 * @throws Exception
 */
void notifyCheckpointTimeout(long checkpointId) throws Exception;
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6306) Sink for eventually consistent file systems

2017-04-17 Thread Seth Wiesman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-6306:

Description: Currently Flink provides the BucketingSink as an exactly once 
method for writing out to a file system. It provides these guarantees by moving 
files through several stages and deleting or truncating files that get into a 
bad state. While this is a powerful abstraction, it causes issues with 
eventually consistent file systems such as Amazon's S3 where must operations 
(ie rename, delete, truncate) are not guaranteed to become consistent within a 
reasonable amount of time. Flink should provide a sink that provides exactly 
once writes to a file system where only PUT operations are considered 
consistent.   (was: Currently Flink provides the BucketingSink as an exactly 
once method for writing out to a file system. It provides there guarantees by 
moving files through several stages and deleting or truncating files that get 
into a bad state. While this is a powerful abstraction, it causes issues with 
eventually consistent file systems such as Amazon's S3 where must operations 
(ie rename, delete, truncate) are not guaranteed to become consistent within a 
reasonable amount of time. Flink should provide a sink that provides exactly 
once writes to a file system where only PUT operations are considered 
consistent. )

> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where must operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6306) Sink for eventually consistent file systems

2017-04-17 Thread Seth Wiesman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-6306:

Description: Currently Flink provides the BucketingSink as an exactly once 
method for writing out to a file system. It provides these guarantees by moving 
files through several stages and deleting or truncating files that get into a 
bad state. While this is a powerful abstraction, it causes issues with 
eventually consistent file systems such as Amazon's S3 where most operations 
(ie rename, delete, truncate) are not guaranteed to become consistent within a 
reasonable amount of time. Flink should provide a sink that provides exactly 
once writes to a file system where only PUT operations are considered 
consistent.   (was: Currently Flink provides the BucketingSink as an exactly 
once method for writing out to a file system. It provides these guarantees by 
moving files through several stages and deleting or truncating files that get 
into a bad state. While this is a powerful abstraction, it causes issues with 
eventually consistent file systems such as Amazon's S3 where must operations 
(ie rename, delete, truncate) are not guaranteed to become consistent within a 
reasonable amount of time. Flink should provide a sink that provides exactly 
once writes to a file system where only PUT operations are considered 
consistent. )

> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6314) Allow setting mapper options

2017-04-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-6314:
---

 Summary: Allow setting mapper options
 Key: FLINK-6314
 URL: https://issues.apache.org/jira/browse/FLINK-6314
 Project: Flink
  Issue Type: Improvement
  Components: Cassandra Connector
Affects Versions: 1.3.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.3.0


A user requested the possibility to set various options that the underlying 
Mapper of the CassandraPojoSink supports.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971074#comment-15971074
 ] 

Ted Yu commented on FLINK-5943:
---

lgtm

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970988#comment-15970988
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111732371
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Very nice.


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3726: [FLINK-6130] [yarn] Fix Consider calling resourceM...

2017-04-17 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111732371
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Very nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970983#comment-15970983
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3726
  
Travis sucks some time.


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3726: [FLINK-6130] [yarn] Fix Consider calling resourceManager#...

2017-04-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3726
  
Travis sucks some time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970977#comment-15970977
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111731219
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Just another nitpick: the `object` variable could probably use a nicer / 
more meaningful name :)


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3726: [FLINK-6130] [yarn] Fix Consider calling resourceM...

2017-04-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111731219
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Just another nitpick: the `object` variable could probably use a nicer / 
more meaningful name :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970975#comment-15970975
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111731104
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Thanks for review. I will update the code. Very appreciate it. 


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3726: [FLINK-6130] [yarn] Fix Consider calling resourceM...

2017-04-17 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111731104
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

Thanks for review. I will update the code. Very appreciate it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970973#comment-15970973
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3726
  
I don't think the timeout was related. We'll see from this Travis run :)


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3726: [FLINK-6130] [yarn] Fix Consider calling resourceManager#...

2017-04-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3726
  
I don't think the timeout was related. We'll see from this Travis run :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970972#comment-15970972
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111730519
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

I would use a log message formatter here:
`LOG.info("YARN Application Master finished and the result is {}", object);`


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3726: [FLINK-6130] [yarn] Fix Consider calling resourceM...

2017-04-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3726#discussion_r111730519
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -158,14 +160,16 @@ protected int runApplicationMaster(Configuration 
config) {
jobManagerRunner.start();
LOG.debug("Job Manager Runner started");
 
+   // wait for resource manager to finish and 
return a value for later to get
+   future = (Future) 
resourceManager.getTerminationFuture();
+
//  (5) start the web monitor
// TODO: add web monitor
}
+   Object object = future.value().get();
 
-   // wait for resource manager to finish
-   resourceManager.getTerminationFuture().get();
// everything started, we can wait until all is done or 
the process is killed
-   LOG.info("YARN Application Master finished");
+   LOG.info("YARN Application Master finished and the 
result is " + object.toString());
--- End diff --

I would use a log message formatter here:
`LOG.info("YARN Application Master finished and the result is {}", object);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970959#comment-15970959
 ] 

ASF GitHub Bot commented on FLINK-6130:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3726
  
@tzulitai Hi, Could you please review my code ? I am appreciate it.


> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3726: [FLINK-6130] [yarn] Fix Consider calling resourceManager#...

2017-04-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3726
  
@tzulitai Hi, Could you please review my code ? I am appreciate it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1960) Add comments and docs for withForwardedFields and related operators

2017-04-17 Thread hzhuangzhenxi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970928#comment-15970928
 ] 

hzhuangzhenxi commented on FLINK-1960:
--

HI,
 I would like to work on this issue.
Thanks

> Add comments and docs for withForwardedFields and related operators
> ---
>
> Key: FLINK-1960
> URL: https://issues.apache.org/jira/browse/FLINK-1960
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Theodore Vasiloudis
>Priority: Minor
>  Labels: documentation, starter
>
> The withForwardedFields and related operators have no docs for the Scala API. 
> It would be useful to have code comments and example usage in the docs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6257) Post-pass OVER windows

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970882#comment-15970882
 ] 

ASF GitHub Bot commented on FLINK-6257:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3697
  
@zentol Thank you for the reminder. And I added some change messages.
Thanks,
SunJincheng


> Post-pass OVER windows
> --
>
> Key: FLINK-6257
> URL: https://issues.apache.org/jira/browse/FLINK-6257
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> The OVER windows have been implemented by several contributors.
> We should do a post pass over the contributed code and improve a few things.
> * Functionality
> ** Currently every time attribute is allowed as ORDER BY attribute. We must 
> check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) 
> and that the order is ASCENDING.
> * Documentation
> ** Add documentation for OVER windows
> * Code style
> ** Consistent naming of {{ProcessFunctions}} and methods
> * Tests
> ** Move the OVER window tests out of SqlITCase into a dedicated class
> ** Move the OVER window tests out of WindowAggregateTest into a dedicated 
> class
> ** Add tests based on the test harness for all {{ProcessFunctions}} similar 
> to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include 
> exact boundary checks for range windows and check for proper parallelization 
> with multiple keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3697: [FLINK-6257][table]Optimize test cases

2017-04-17 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3697
  
@zentol Thank you for the reminder. And I added some change messages.
Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6313) Some words was spelled wrong and incorrect LOG.error without print

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970818#comment-15970818
 ] 

ASF GitHub Bot commented on FLINK-6313:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3728

[FLINK-6313] [flink-runtime] Fix Some words was spelled wrong and inc…

…orrect LOG.error without print.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 6313

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3728.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3728


commit a7c07ce5acac38c544045ff40c189d6973c91a6e
Author: zhangminglei 
Date:   2017-04-17T07:49:54Z

[FLINK-6313] [flink-runtime] Fix Some words was spelled wrong and incorrect 
LOG.error without print.




> Some words was spelled wrong and incorrect LOG.error without print
> --
>
> Key: FLINK-6313
> URL: https://issues.apache.org/jira/browse/FLINK-6313
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Trivial
>
> I find some words are spelled wrong and log.error without print information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3728: [FLINK-6313] [flink-runtime] Fix Some words was sp...

2017-04-17 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3728

[FLINK-6313] [flink-runtime] Fix Some words was spelled wrong and inc…

…orrect LOG.error without print.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 6313

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3728.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3728


commit a7c07ce5acac38c544045ff40c189d6973c91a6e
Author: zhangminglei 
Date:   2017-04-17T07:49:54Z

[FLINK-6313] [flink-runtime] Fix Some words was spelled wrong and incorrect 
LOG.error without print.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6313) Some words was spelled wrong and incorrect LOG.error without print

2017-04-17 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6313:
---

 Summary: Some words was spelled wrong and incorrect LOG.error 
without print
 Key: FLINK-6313
 URL: https://issues.apache.org/jira/browse/FLINK-6313
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Reporter: mingleizhang
Assignee: mingleizhang
Priority: Trivial


I find some words are spelled wrong and log.error without print information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970759#comment-15970759
 ] 

mingleizhang commented on FLINK-5943:
-

[~tedyu] Hi, Ted, How do you think of this ? I have put what access to 
haSerivices inside of the synchronized block.Thanks. 
{code}
protected void shutdown(ApplicationStatus status, String msg) {
synchronized (lock) {
// Need to clear the job state in the HA services before 
shutdown
try {

haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
}
catch (Throwable t) {
LOG.warn("Could not clear the job at the 
high-availability services", t);
}
if (jobManagerRunner != null) {
try {
jobManagerRunner.shutdown();
} catch (Throwable tt) {
LOG.warn("Failed to stop the JobManagerRunner", tt);
}
}
{code}

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5943:
---

Assignee: mingleizhang

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6312) Update curator version to 2.12.0

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970749#comment-15970749
 ] 

ASF GitHub Bot commented on FLINK-6312:
---

GitHub user WangTaoTheTonic opened a pull request:

https://github.com/apache/flink/pull/3727

[FLINK-6312]update curator version to 2.12.0 to avoid potential block

As there's a Major 
bug([CURATOR-344](https://issues.apache.org/jira/browse/CURATOR-344)) in 
curator release used by flink, we need to update the release to 2.12.0 to avoid 
potential block in flink.

 (flink use recipes in checkpoint coordinator and we have already occurred 
problem in zookeeper failover when we're trying to fix 
[FLINK-6174](https://issues.apache.org/jira/browse/FLINK-6174))

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WangTaoTheTonic/flink FLINK-6312

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3727


commit 014b042006c0d6b1939e00c68e7d15ce8262400a
Author: WangTaoTheTonic 
Date:   2017-04-17T06:55:27Z

update curator version to 2.12.0 to avoid potential block




> Update curator version to 2.12.0
> 
>
> Key: FLINK-6312
> URL: https://issues.apache.org/jira/browse/FLINK-6312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> As there's a Major bug(https://issues.apache.org/jira/browse/CURATOR-344) in 
> curator release used by flink, we need to update the release to 2.12.0 to 
> avoid potential block in flink. (flink use recipes in checkpoint coordinator 
> and we have already occurred problem in zookeeper failover when we're trying 
> to fix https://issues.apache.org/jira/browse/FLINK-6174)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3727: [FLINK-6312]update curator version to 2.12.0 to av...

2017-04-17 Thread WangTaoTheTonic
GitHub user WangTaoTheTonic opened a pull request:

https://github.com/apache/flink/pull/3727

[FLINK-6312]update curator version to 2.12.0 to avoid potential block

As there's a Major 
bug([CURATOR-344](https://issues.apache.org/jira/browse/CURATOR-344)) in 
curator release used by flink, we need to update the release to 2.12.0 to avoid 
potential block in flink.

 (flink use recipes in checkpoint coordinator and we have already occurred 
problem in zookeeper failover when we're trying to fix 
[FLINK-6174](https://issues.apache.org/jira/browse/FLINK-6174))

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WangTaoTheTonic/flink FLINK-6312

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3727


commit 014b042006c0d6b1939e00c68e7d15ce8262400a
Author: WangTaoTheTonic 
Date:   2017-04-17T06:55:27Z

update curator version to 2.12.0 to avoid potential block




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6312) Update curator version to 2.12.0

2017-04-17 Thread Tao Wang (JIRA)
Tao Wang created FLINK-6312:
---

 Summary: Update curator version to 2.12.0
 Key: FLINK-6312
 URL: https://issues.apache.org/jira/browse/FLINK-6312
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Reporter: Tao Wang
Assignee: Tao Wang


As there's a Major bug in curator release used by flink, we need to update the 
release to 2.12.0 to avoid potential block in flink. (flink use recipes in 
checkpoint coordinator and we have already occurred problem in zookeeper 
failover when we're trying to fix 
https://issues.apache.org/jira/browse/FLINK-6174)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6312) Update curator version to 2.12.0

2017-04-17 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-6312:

Description: As there's a Major 
bug(https://issues.apache.org/jira/browse/CURATOR-344) in curator release used 
by flink, we need to update the release to 2.12.0 to avoid potential block in 
flink. (flink use recipes in checkpoint coordinator and we have already 
occurred problem in zookeeper failover when we're trying to fix 
https://issues.apache.org/jira/browse/FLINK-6174)  (was: As there's a Major bug 
in curator release used by flink, we need to update the release to 2.12.0 to 
avoid potential block in flink. (flink use recipes in checkpoint coordinator 
and we have already occurred problem in zookeeper failover when we're trying to 
fix https://issues.apache.org/jira/browse/FLINK-6174))

> Update curator version to 2.12.0
> 
>
> Key: FLINK-6312
> URL: https://issues.apache.org/jira/browse/FLINK-6312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> As there's a Major bug(https://issues.apache.org/jira/browse/CURATOR-344) in 
> curator release used by flink, we need to update the release to 2.12.0 to 
> avoid potential block in flink. (flink use recipes in checkpoint coordinator 
> and we have already occurred problem in zookeeper failover when we're trying 
> to fix https://issues.apache.org/jira/browse/FLINK-6174)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

2017-04-17 Thread Rahul Yadav (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970740#comment-15970740
 ] 

Rahul Yadav commented on FLINK-6301:


Sure. My pleasure. :)
Sent the email. Will assign it to myself as soon as I get the permission. Thanx!

> Flink KafkaConnector09 leaks memory on reading compressed messages due to a 
> Kafka consumer bug
> --
>
> Key: FLINK-6301
> URL: https://issues.apache.org/jira/browse/FLINK-6301
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.1.3, 1.1.4
>Reporter: Rahul Yadav
> Attachments: jeprof.24611.1228.i1228.heap.svg, 
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, 
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, 
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, 
> POSTFIX.jeprof.14880.1944.i1944.heap.svg, 
> POSTFIX.jeprof.14880.4129.i4129.heap.svg, 
> POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, 
> POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using 
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of 
> malloc calls from the java process and attached are the profiles generated at 
> various stages of the job. As we can see, apart from the os.malloc and 
> rocksDB.allocateNewBlock, there are additional malloc calls coming from 
> inflate() method of java.util.zip.inflater. These calls are innocuous as long 
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running 
> process to dump caller stack on the method call. Following is the stackTrace 
> we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of 
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
> Kafka consumer code, we found that RecordsIterator is not closing the 
> compressor stream after use and hence, causing the memory leak:
> 

[jira] [Comment Edited] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970738#comment-15970738
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5697 at 4/17/17 6:40 AM:
-

I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards actually just results in new empty 
shards with new key ranges, while the previous shards that were merged or split 
are simple closed (i.e., records that were collected by Kinesis before the 
resharding are consumed from the old closed shards). Therefore, we can still 
assume simple event time patterns per-shard, as there is no merging or 
splitting of the Kinesis records taking place and their event time pattern is 
kept intact.


was (Author: tzulitai):
I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards actually just results in new empty 
shards with new key ranges, while the previous shards that were merged or split 
are simple closed (i.e., records that were collected by Kinesis before the 
resharding are consumed from the old closed shards). Therefore, we can still 
assume simple event time patterns per-shard, as there is no merging or 
splitting of the Kinesis records taking place.

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970738#comment-15970738
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5697 at 4/17/17 6:40 AM:
-

I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards actually just results in new empty 
shards with new key ranges, while the previous shards that were merged or split 
are simple closed (i.e., records that were collected by Kinesis before the 
resharding are consumed from the old closed shards). Therefore, we can still 
assume simple event time patterns per-shard, as there is no merging or 
splitting of the Kinesis records taking place.


was (Author: tzulitai):
I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards always result in new empty shards with 
new key ranges, and the previous shards that were merged or split are simple 
closed (i.e., records that were collected by Kinesis before the resharding are 
consumed from the old closed shards). Therefore, we can still assume simple 
event time patterns per-shard, as there is no merging or splitting of the 
Kinesis records taking place.

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970738#comment-15970738
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5697:


I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards always result in new empty shards with 
new key ranges, and the previous shards that were merged or split are simple 
closed (i.e., records that were collected by Kinesis before the resharding are 
consumed from the old closed shards). Therefore, we can still assume simple 
event time patterns per-shard, as there is no merging or splitting of the 
Kinesis records taking place.

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang edited comment on FLINK-5611 at 4/17/17 6:35 AM:
--

[~uce]  Hi, How do you think of this ? It is under the below. 
Thanks.{code}org.apache.flink.runtime.query.QueryableStateException{code}. 

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.


was (Author: mingleizhang):
[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang edited comment on FLINK-5611 at 4/17/17 6:34 AM:
--

[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.


was (Author: mingleizhang):
[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code}

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970730#comment-15970730
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6301:


Yes, absolutely! Assign the issue to yourself to let others know that you're 
working on it. And thanks, btw :)

Could you send an email to the Flink developer mailing list and ask for JIRA 
contributor permission? Then you can assign this and any future JIRAs you'd 
like to work on to yourself.

> Flink KafkaConnector09 leaks memory on reading compressed messages due to a 
> Kafka consumer bug
> --
>
> Key: FLINK-6301
> URL: https://issues.apache.org/jira/browse/FLINK-6301
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.1.3, 1.1.4
>Reporter: Rahul Yadav
> Attachments: jeprof.24611.1228.i1228.heap.svg, 
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, 
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, 
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, 
> POSTFIX.jeprof.14880.1944.i1944.heap.svg, 
> POSTFIX.jeprof.14880.4129.i4129.heap.svg, 
> POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, 
> POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using 
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of 
> malloc calls from the java process and attached are the profiles generated at 
> various stages of the job. As we can see, apart from the os.malloc and 
> rocksDB.allocateNewBlock, there are additional malloc calls coming from 
> inflate() method of java.util.zip.inflater. These calls are innocuous as long 
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running 
> process to dump caller stack on the method call. Following is the stackTrace 
> we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of 
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
> Kafka consumer 

[jira] [Commented] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang commented on FLINK-5611:
-

[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code}

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

2017-04-17 Thread Rahul Yadav (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rahul Yadav updated FLINK-6301:
---
Attachment: POSTFIX.jeprof.14880.4129.i4129.heap.svg
POSTFIX.jeprof.14880.1944.i1944.heap.svg
POSTFIX.jeprof.14880.961.i961.heap.svg
POSTFIX.jeprof.14880.99.i99.heap.svg
POSTFIX.jeprof.14880.9.i9.heap.svg

> Flink KafkaConnector09 leaks memory on reading compressed messages due to a 
> Kafka consumer bug
> --
>
> Key: FLINK-6301
> URL: https://issues.apache.org/jira/browse/FLINK-6301
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.1.3, 1.1.4
>Reporter: Rahul Yadav
> Attachments: jeprof.24611.1228.i1228.heap.svg, 
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, 
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, 
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, 
> POSTFIX.jeprof.14880.1944.i1944.heap.svg, 
> POSTFIX.jeprof.14880.4129.i4129.heap.svg, 
> POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, 
> POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using 
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of 
> malloc calls from the java process and attached are the profiles generated at 
> various stages of the job. As we can see, apart from the os.malloc and 
> rocksDB.allocateNewBlock, there are additional malloc calls coming from 
> inflate() method of java.util.zip.inflater. These calls are innocuous as long 
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running 
> process to dump caller stack on the method call. Following is the stackTrace 
> we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of 
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
> Kafka consumer code, we found that RecordsIterator is not closing the 
> compressor stream after 

[jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

2017-04-17 Thread Rahul Yadav (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970725#comment-15970725
 ] 

Rahul Yadav commented on FLINK-6301:


BTW, I wanted to start contributing to the Flink project and this seems like an 
ideal change to understand the process. 
Shall I assign it to myself ?

> Flink KafkaConnector09 leaks memory on reading compressed messages due to a 
> Kafka consumer bug
> --
>
> Key: FLINK-6301
> URL: https://issues.apache.org/jira/browse/FLINK-6301
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.1.3, 1.1.4
>Reporter: Rahul Yadav
> Attachments: jeprof.24611.1228.i1228.heap.svg, 
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, 
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, 
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, 
> POSTFIX.jeprof.14880.1944.i1944.heap.svg, 
> POSTFIX.jeprof.14880.4129.i4129.heap.svg, 
> POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, 
> POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using 
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of 
> malloc calls from the java process and attached are the profiles generated at 
> various stages of the job. As we can see, apart from the os.malloc and 
> rocksDB.allocateNewBlock, there are additional malloc calls coming from 
> inflate() method of java.util.zip.inflater. These calls are innocuous as long 
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running 
> process to dump caller stack on the method call. Following is the stackTrace 
> we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of 
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
> Kafka consumer code, we found that RecordsIterator is not closing the 
> compressor stream after use and hence, causing the memory leak:
> 

  1   2   >