[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221816#comment-16221816
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4909
  
It should call `dispose()`, you are correct. This was a mistake  due to 
sloppy "manual rebasing".


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4909: [FLINK-7880][QS] Fix QS test instabilities.

2017-10-26 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4909
  
It should call `dispose()`, you are correct. This was a mistake  due to 
sloppy "manual rebasing".


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221812#comment-16221812
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146585641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1332,6 +1322,11 @@ abstract class CodeGenerator(
 GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 
+  private[flink] def generateCurrentTimestamp(): GeneratedExpression = {
+val rexBuilder = new RexBuilder(new FlinkTypeFactory(new 
FlinkTypeSystem))
+generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP))
--- End diff --

Use `new CurrentTimePointCallGen().generate()` instead of create a whole 
new type environment ;)


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221813#comment-16221813
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146575110
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -89,34 +98,50 @@ public void addColumn(String family, String qualifier, 
Class clazz) {
 * @param charset Name of the charset to use.
 */
public void setCharset(String charset) {
-   this.schema.setCharset(charset);
+   this.hBaseSchema.setCharset(charset);
}
 
@Override
public TypeInformation getReturnType() {
-   String[] famNames = schema.getFamilyNames();
-   TypeInformation[] typeInfos = new 
TypeInformation[famNames.length];
+   return new RowTypeInfo(getFieldTypes(), getFieldNames());
+   }
+
+   public TableSchema getTableSchema() {
--- End diff --

Add `@Override` annotation.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221811#comment-16221811
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146583860
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
--- End diff --

Add the Scala doc for the parameter.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221808#comment-16221808
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146576347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 ---
@@ -50,6 +50,11 @@ class TableSchema(
 
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
 
+  /** Returns a copy of the TableSchema */
+  def copy: TableSchema = {
--- End diff --

Maybe mention the deep copy behavior.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221805#comment-16221805
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146587166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, 
RelOptTable, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
 abstract class PhysicalTableSourceScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 table: RelOptTable,
-val tableSource: TableSource[_])
+val tableSource: TableSource[_],
+val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table) {
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildLogicalRowType(
-  TableEnvironment.getFieldNames(tableSource),
-  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+  case _: StreamTableSourceTable[_] => true
+  case _: BatchTableSourceTable[_] => false
+  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+}
+
+TableSourceUtil.getTableSchema(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
--- End diff --

I would rename this method to `getRelDataType`. Because it does not return 
our `TableSchema`.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221810#comment-16221810
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146582532
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging {
   } else {
 FlinkStatistic.UNKNOWN
   }
+
   convertedTableSource match {
-case s : StreamTableSource[_] => new StreamTableSourceTable(s, 
flinkStatistic)
-case _ => new TableSourceTable(convertedTableSource, 
flinkStatistic)
+case s: StreamTableSource[_] =>
--- End diff --

What happens if the table source implements both interfaces?


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221807#comment-16221807
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146584051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
 : GeneratedExpression = {
 
 val input1AccessExprs = input1Mapping.map {
-  case TimeIndicatorTypeInfo.ROWTIME_MARKER =>
-// attribute is a rowtime indicator. Access event-time timestamp 
in StreamRecord.
-generateRowtimeAccess()
-  case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
+  case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+   TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+// attribute is a rowtime indicator.
+if (rowtimeExpression.isDefined) {
--- End diff --

We could use pattern matching here.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221806#comment-16221806
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146574319
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

Maybe we should add a constructor or method to `TableSchema` that takes a 
TypeInfo. I think this case is very common for table sources.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221809#comment-16221809
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146586028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 ---
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-* We check if the input type is exactly the same as the internal row 
type.
-* A conversion is necessary if types differ.
-*/
-  private[flink] def needsConversion(
-  externalTypeInfo: TypeInformation[Any],
-  internalTypeInfo: TypeInformation[T]): Boolean =
-externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-  config: TableConfig,
-  functionClass: Class[F],
-  inputType: TypeInformation[Any],
-  expectedType: TypeInformation[Row],
-  conversionOperatorName: String,
-  fieldNames: Seq[String],
-  inputFieldMapping: Option[Array[Int]] = None)
-: GeneratedFunction[F, Row] = {
-
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputType,
-  None,
-  inputFieldMapping)
-val conversion = 
generator.generateConverterResultExpression(expectedType, fieldNames)
-
-val body =
-  s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
-generator.generateFunction(
-  conversionOperatorName,
-  functionClass,
-  body,
-  expectedType)
-  }
-
-}
+trait CommonScan[T]
--- End diff --

Do we still need this trait if it does not contain anything? Let's keep it 
simple and remove it for now.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146574319
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

Maybe we should add a constructor or method to `TableSchema` that takes a 
TypeInfo. I think this case is very common for table sources.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146582532
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging {
   } else {
 FlinkStatistic.UNKNOWN
   }
+
   convertedTableSource match {
-case s : StreamTableSource[_] => new StreamTableSourceTable(s, 
flinkStatistic)
-case _ => new TableSourceTable(convertedTableSource, 
flinkStatistic)
+case s: StreamTableSource[_] =>
--- End diff --

What happens if the table source implements both interfaces?


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146576347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 ---
@@ -50,6 +50,11 @@ class TableSchema(
 
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
 
+  /** Returns a copy of the TableSchema */
+  def copy: TableSchema = {
--- End diff --

Maybe mention the deep copy behavior.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146586028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 ---
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-* We check if the input type is exactly the same as the internal row 
type.
-* A conversion is necessary if types differ.
-*/
-  private[flink] def needsConversion(
-  externalTypeInfo: TypeInformation[Any],
-  internalTypeInfo: TypeInformation[T]): Boolean =
-externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-  config: TableConfig,
-  functionClass: Class[F],
-  inputType: TypeInformation[Any],
-  expectedType: TypeInformation[Row],
-  conversionOperatorName: String,
-  fieldNames: Seq[String],
-  inputFieldMapping: Option[Array[Int]] = None)
-: GeneratedFunction[F, Row] = {
-
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputType,
-  None,
-  inputFieldMapping)
-val conversion = 
generator.generateConverterResultExpression(expectedType, fieldNames)
-
-val body =
-  s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
-generator.generateFunction(
-  conversionOperatorName,
-  functionClass,
-  body,
-  expectedType)
-  }
-
-}
+trait CommonScan[T]
--- End diff --

Do we still need this trait if it does not contain anything? Let's keep it 
simple and remove it for now.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146584051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
 : GeneratedExpression = {
 
 val input1AccessExprs = input1Mapping.map {
-  case TimeIndicatorTypeInfo.ROWTIME_MARKER =>
-// attribute is a rowtime indicator. Access event-time timestamp 
in StreamRecord.
-generateRowtimeAccess()
-  case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
+  case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+   TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+// attribute is a rowtime indicator.
+if (rowtimeExpression.isDefined) {
--- End diff --

We could use pattern matching here.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146585641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1332,6 +1322,11 @@ abstract class CodeGenerator(
 GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 
+  private[flink] def generateCurrentTimestamp(): GeneratedExpression = {
+val rexBuilder = new RexBuilder(new FlinkTypeFactory(new 
FlinkTypeSystem))
+generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP))
--- End diff --

Use `new CurrentTimePointCallGen().generate()` instead of create a whole 
new type environment ;)


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146587166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, 
RelOptTable, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
 abstract class PhysicalTableSourceScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 table: RelOptTable,
-val tableSource: TableSource[_])
+val tableSource: TableSource[_],
+val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table) {
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildLogicalRowType(
-  TableEnvironment.getFieldNames(tableSource),
-  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+  case _: StreamTableSourceTable[_] => true
+  case _: BatchTableSourceTable[_] => false
+  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+}
+
+TableSourceUtil.getTableSchema(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
--- End diff --

I would rename this method to `getRelDataType`. Because it does not return 
our `TableSchema`.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146575110
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -89,34 +98,50 @@ public void addColumn(String family, String qualifier, 
Class clazz) {
 * @param charset Name of the charset to use.
 */
public void setCharset(String charset) {
-   this.schema.setCharset(charset);
+   this.hBaseSchema.setCharset(charset);
}
 
@Override
public TypeInformation getReturnType() {
-   String[] famNames = schema.getFamilyNames();
-   TypeInformation[] typeInfos = new 
TypeInformation[famNames.length];
+   return new RowTypeInfo(getFieldTypes(), getFieldNames());
+   }
+
+   public TableSchema getTableSchema() {
--- End diff --

Add `@Override` annotation.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146583860
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
--- End diff --

Add the Scala doc for the parameter.


---


[jira] [Created] (FLINK-7936) Lack of synchronization w.r.t. taskManagers in MetricStore#add()

2017-10-26 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7936:
-

 Summary: Lack of synchronization w.r.t. taskManagers in 
MetricStore#add()
 Key: FLINK-7936
 URL: https://issues.apache.org/jira/browse/FLINK-7936
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) 
info).taskManagerID;
  tm = taskManagers.computeIfAbsent(tmID, k -> new 
TaskManagerMetricStore());
{code}
In other places, access to taskManagers is protected by lock on MetricStore.this



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) flink-clients

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221713#comment-16221713
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4712
  
thx @zentol, I will add the `maven-dependency-plugin` to flink-parent 
pom.xml file. 
```

org.apache.maven.plugins
maven-dependency-plugin
3.0.2


analyze

analyze-only


true





org.apache.flink:force-shading

org.slf4j:slf4j-api

com.google.code.findbugs:jsr305

junit:junit:jar

org.mockito:mockito-all

org.powermock:powermock-module-junit4

org.powermock:powermock-api-mockito

org.hamcrest:hamcrest-all

org.slf4j:slf4j-log4j12

log4j:log4j:jar





```

And then one by one for each module to fix (used but undeclared / unused 
but declared)  dependency issues.


Is this the right way?




> flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4712: [FLINK-7574][build] POM Cleanup flink-clients

2017-10-26 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4712
  
thx @zentol, I will add the `maven-dependency-plugin` to flink-parent 
pom.xml file. 
```

org.apache.maven.plugins
maven-dependency-plugin
3.0.2


analyze

analyze-only


true





org.apache.flink:force-shading

org.slf4j:slf4j-api

com.google.code.findbugs:jsr305

junit:junit:jar

org.mockito:mockito-all

org.powermock:powermock-module-junit4

org.powermock:powermock-api-mockito

org.hamcrest:hamcrest-all

org.slf4j:slf4j-log4j12

log4j:log4j:jar





```

And then one by one for each module to fix (used but undeclared / unused 
but declared)  dependency issues.


Is this the right way?




---


[jira] [Commented] (FLINK-6875) Remote DataSet API job submission timing out

2017-10-26 Thread hetang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221634#comment-16221634
 ] 

hetang commented on FLINK-6875:
---

十月 26, 2017 7:51:15 下午 org.apache.flink.runtime.blob.BlobClient uploadJarFiles
信息: Blob client connecting to akka.tcp://flink@hbase-1:50022/user/jobmanager
十月 26, 2017 7:51:19 下午 
org.apache.flink.runtime.client.JobSubmissionClientActor$1 call
信息: Submit job to the job manager 
akka.tcp://flink@hbase-1:50022/user/jobmanager.
十月 26, 2017 7:52:19 下午 org.apache.flink.runtime.client.JobClientActor terminate
信息: Terminate JobClientActor.
十月 26, 2017 7:52:19 下午 org.apache.flink.runtime.client.JobClientActor 
disconnectFromJobManager
信息: Disconnect from JobManager 
Actor[akka.tcp://flink@hbase-1:50022/user/jobmanager#1597589120].
十月 26, 2017 7:52:19 下午 
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
apply$mcV$sp
信息: Shutting down remote daemon.
十月 26, 2017 7:52:19 下午 
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
apply$mcV$sp
信息: Remote daemon shut down; proceeding with flushing remote transports.
十月 26, 2017 7:52:19 下午 
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
apply$mcV$sp
信息: Remoting shut down.
十月 26, 2017 7:52:19 下午 org.apache.beam.runners.flink.FlinkRunner run
严重: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:114)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:118)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at org.apache.beam.examples.WordCount.main(WordCount.java:184)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 18 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTa

[jira] [Commented] (FLINK-7705) Port JobDetailsHandler to new REST endpoint

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221571#comment-16221571
 ] 

ASF GitHub Bot commented on FLINK-7705:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4884
  
Hi @tillrohrmann , could you merge this PR when you're free? Some issues 
depend on `MetricFetcher` and I found it would be added to 
`DispatcherRestEndpoint` in this PR. THX


> Port JobDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7705
> URL: https://issues.apache.org/jira/browse/FLINK-7705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port existing {{JobDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4884: [FLINK-7705] Add JobDetailsHandler

2017-10-26 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4884
  
Hi @tillrohrmann , could you merge this PR when you're free? Some issues 
depend on `MetricFetcher` and I found it would be added to 
`DispatcherRestEndpoint` in this PR. THX


---


[jira] [Commented] (FLINK-7100) TaskManager metrics are registered twice

2017-10-26 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221567#comment-16221567
 ] 

Fang Yong commented on FLINK-7100:
--

[~Zentol] [~till.rohrmann], thank you for your suggestions. If I understand 
correctly, we should look at this issue and think about how to fix it after 
FLINK-7876, right? THX

> TaskManager metrics are registered twice
> 
>
> Key: FLINK-7100
> URL: https://issues.apache.org/jira/browse/FLINK-7100
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Metrics
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.4.0, 1.3.3
>
>
> TaskManager metrics are currently registered twice, once when the TaskManager 
> is started and once when the TaskManager associates with a JobManager.
> Originally the metrics were registered when the TM associates with the JM and 
> unregistered upon disassociation.
> 9e9776f17ed18b12af177e31ab0bc266236f85ef modified the {{TaskManager}} to use 
> the {{TaskManagerServices}}, which when loaded _also_ register the metrics.
> I suggest to remove the registrations that happen upon (dis-)association.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221566#comment-16221566
 ] 

Wei-Che Wei commented on FLINK-7935:


Hi [~elevy]
Is this issue same as FLINK-7692

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7935:
-

 Summary: Metrics with user supplied scope variables
 Key: FLINK-7935
 URL: https://issues.apache.org/jira/browse/FLINK-7935
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2
Reporter: Elias Levy


We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
metrics.

Flink names and scopes metrics together, at least by default. E.g. by default  
the System scope for operator metrics is 
{{.taskmanager}}.  The 
scope variables become part of the metric's full name.

In DD the metric would be named something generic, e.g. 
{{taskmanager.job.operator}}, and they would be distinguished by their tag 
values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.

Flink allows you to configure the format string for system scopes, so it is 
possible to set the operator scope format to {{taskmanager.job.operator}}.  We 
do this for all scopes:

{code}
metrics.scope.jm: jobmanager
metrics.scope.jm.job: jobmanager.job
metrics.scope.tm: taskmanager
metrics.scope.tm.job: taskmanager.job
metrics.scope.task: taskmanager.job.task
metrics.scope.operator: taskmanager.job.operator
{code}

This seems to work.  The DataDog Flink metric's plugin submits all scope 
variables as tags, even if they are not used within the scope format.  And it 
appears internally this does not lead to metrics conflicting with each other.

We would like to extend this to user defined metrics, but you can define 
variables/scopes when adding a metric group or metric with the user API, so 
that in DD we have a single metric with a tag with many different values, 
rather than hundreds of metrics to just the one value we want to measure across 
different event types.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221305#comment-16221305
 ] 

Fabian Hueske commented on FLINK-7051:
--

Calcite dependency has been upgraded to 1.14 with 
4d8308810b21fe6b760f7c7f30788fb1f4a866d7

The subtasks of this issue can now be addressed.

> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221301#comment-16221301
 ] 

ASF GitHub Bot commented on FLINK-6173:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4837


> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221300#comment-16221300
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4873


> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4837: [FLINK-6173] [table] Clean-up flink-table jar and ...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4837


---


[GitHub] flink pull request #4873: [FLINK-7051] [table] Bump Calcite version to 1.14.

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4873


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221173#comment-16221173
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3748
  
Thanks for the update and reminder @PangZhi.
I'll have a look tomorrow. Maybe we can include this in the upcoming 1.4.0 
release.

Thanks, Fabian


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3748
  
Thanks for the update and reminder @PangZhi.
I'll have a look tomorrow. Maybe we can include this in the upcoming 1.4.0 
release.

Thanks, Fabian


---


[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221164#comment-16221164
 ] 

ASF GitHub Bot commented on FLINK-6173:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4837
  
I'll fix the broken build and will merge this PR.


> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4837: [FLINK-6173] [table] Clean-up flink-table jar and depende...

2017-10-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4837
  
I'll fix the broken build and will merge this PR.


---


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220959#comment-16220959
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4910
  
@aljoscha 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220957#comment-16220957
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4910

[FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSinkFunction when 
failing to commit during job recovery

## What is the purpose of the change
This makes it possible to configure the TwoPhaseCommitSinkFunction's 
behaviour w.r.t. transaction timeouts.

## Brief change log
  - *Introduce transaction timeouts to TwoPhaseCommitSinkFunction.* 
  - *Timeout can be used to generate warnings if the transaction's age 
approaches the timeout.*
  - *If an exception is thrown during job recovery, the sink can be 
configured not to propagate the exception and instead log it on ERROR level.*


## Verifying this change
This change added tests and can be verified as follows:
  - *Extended unit tests for TwoPhaseCommitSinkFunction to test added 
functionality*
  - *Manually verified the change by running a job with a 
FlinkKafka011Producer with checkpoint interval 27000 and transaction.timeout.ms 
= 3. Warnings were generated correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7784

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4910


commit 23ae221eb854ce12572988aed5c018aac8919af7
Author: gyao 
Date:   2017-10-26T17:17:55Z

[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction aware of 
transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.

commit 43103c1fb61a6bc1aec6b19c0253fcca281cfba5
Author: gyao 
Date:   2017-10-26T17:25:35Z

[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests




> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSi...

2017-10-26 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4910
  
@aljoscha 


---


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-26 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4910

[FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSinkFunction when 
failing to commit during job recovery

## What is the purpose of the change
This makes it possible to configure the TwoPhaseCommitSinkFunction's 
behaviour w.r.t. transaction timeouts.

## Brief change log
  - *Introduce transaction timeouts to TwoPhaseCommitSinkFunction.* 
  - *Timeout can be used to generate warnings if the transaction's age 
approaches the timeout.*
  - *If an exception is thrown during job recovery, the sink can be 
configured not to propagate the exception and instead log it on ERROR level.*


## Verifying this change
This change added tests and can be verified as follows:
  - *Extended unit tests for TwoPhaseCommitSinkFunction to test added 
functionality*
  - *Manually verified the change by running a job with a 
FlinkKafka011Producer with checkpoint interval 27000 and transaction.timeout.ms 
= 3. Warnings were generated correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7784

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4910


commit 23ae221eb854ce12572988aed5c018aac8919af7
Author: gyao 
Date:   2017-10-26T17:17:55Z

[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction aware of 
transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.

commit 43103c1fb61a6bc1aec6b19c0253fcca281cfba5
Author: gyao 
Date:   2017-10-26T17:25:35Z

[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests




---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2017-10-26 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220895#comment-16220895
 ] 

Rong Rong commented on FLINK-7923:
--

Done, created umbrella task for Calcite 1.15 release.

> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7934) Upgrade Calcite dependency to 1.15

2017-10-26 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7934:


 Summary: Upgrade Calcite dependency to 1.15
 Key: FLINK-7934
 URL: https://issues.apache.org/jira/browse/FLINK-7934
 Project: Flink
  Issue Type: Bug
Reporter: Rong Rong


Umbrella issue for all related issues for Apache Calcite 1.15 release.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220878#comment-16220878
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@zentol Hey zentol, when do you think we will be able to merge in.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-26 Thread PangZhi
Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@zentol Hey zentol, when do you think we will be able to merge in.


---


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220824#comment-16220824
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4909

[FLINK-7880][QS] Fix QS test instabilities.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-test-instability

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4909


commit 2b3bba7a2e3778cf3f9d580a32bd466d77fcdb39
Author: kkloudas 
Date:   2017-10-26T17:11:03Z

[FLINK-7880][QS] Fix QS test instabilities.




> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-26 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220825#comment-16220825
 ] 

Kostas Kloudas commented on FLINK-7880:
---

I think this PR fixes it.

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4909: [FLINK-7880][QS] Fix QS test instabilities.

2017-10-26 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4909

[FLINK-7880][QS] Fix QS test instabilities.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-test-instability

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4909


commit 2b3bba7a2e3778cf3f9d580a32bd466d77fcdb39
Author: kkloudas 
Date:   2017-10-26T17:11:03Z

[FLINK-7880][QS] Fix QS test instabilities.




---


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-26 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4908

[FLINK-7933][metrics] Improve PrometheusReporter tests

## What is the purpose of the change

This PR resolves the test instabilities of the prometheus reporter.

## Brief change log

* use a port range instead of a single hard-coded port
  * required the exposure of the bound port by the reporter
* move registry/reporter initialization into an `@Before` method 
(guarantees execution order)
* shutdown registry in `PrometheusReporterTest`

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7933

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4908.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4908


commit 36168084423f704d8fb2647acf8dda2a09e4ee1d
Author: zentol 
Date:   2017-10-26T17:04:30Z

[FLINK-7933][metrics] Improve PrometheusReporter tests




---


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220811#comment-16220811
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4908

[FLINK-7933][metrics] Improve PrometheusReporter tests

## What is the purpose of the change

This PR resolves the test instabilities of the prometheus reporter.

## Brief change log

* use a port range instead of a single hard-coded port
  * required the exposure of the bound port by the reporter
* move registry/reporter initialization into an `@Before` method 
(guarantees execution order)
* shutdown registry in `PrometheusReporterTest`

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7933

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4908.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4908


commit 36168084423f704d8fb2647acf8dda2a09e4ee1d
Author: zentol 
Date:   2017-10-26T17:04:30Z

[FLINK-7933][metrics] Improve PrometheusReporter tests




> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7908) Restructure the QS module to reduce client deps.

2017-10-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-7908.
-
Resolution: Fixed

Merged at 0c771505b84cdacf7a359c3be0efe38a30f9e660

> Restructure the QS module to reduce client deps.
> 
>
> Key: FLINK-7908
> URL: https://issues.apache.org/jira/browse/FLINK-7908
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7824) Put the queryable state jars in the opt folder.

2017-10-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-7824.
-
Resolution: Fixed

> Put the queryable state jars in the opt folder.
> ---
>
> Key: FLINK-7824
> URL: https://issues.apache.org/jira/browse/FLINK-7824
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> In 1.4, to enable the queryable state, the user has to put the adequate jars 
> in the lib folder. The first step before *putting* the jars, is to *find* the 
> jars in the distribution. So the location of these jars can be in the `opt` 
> folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-7933:
---

Assignee: Chesnay Schepler

> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7824) Put the queryable state jars in the opt folder.

2017-10-26 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220791#comment-16220791
 ] 

Kostas Kloudas commented on FLINK-7824:
---

Merged at 2fd8721d0375bfa60a190bb206a65287c167a43a

> Put the queryable state jars in the opt folder.
> ---
>
> Key: FLINK-7824
> URL: https://issues.apache.org/jira/browse/FLINK-7824
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> In 1.4, to enable the queryable state, the user has to put the adequate jars 
> in the lib folder. The first step before *putting* the jars, is to *find* the 
> jars in the distribution. So the location of these jars can be in the `opt` 
> folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7908) Restructure the QS module to reduce client deps.

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220789#comment-16220789
 ] 

ASF GitHub Bot commented on FLINK-7908:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4906


> Restructure the QS module to reduce client deps.
> 
>
> Key: FLINK-7908
> URL: https://issues.apache.org/jira/browse/FLINK-7908
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4906: [FLINK-7908][FLINK-7824][QS] Restructure QS packag...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4906


---


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220771#comment-16220771
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Merged in 8595dadb89a1276c6c7d0ed2e2fae396a5c1d222


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220772#comment-16220772
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4827


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Merged in 8595dadb89a1276c6c7d0ed2e2fae396a5c1d222


---


[jira] [Closed] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7840.
---

> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-26 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4827


---


[jira] [Resolved] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7840.
-
Resolution: Fixed

Fixed via 8595dadb89a1276c6c7d0ed2e2fae396a5c1d222

> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7100) TaskManager metrics are registered twice

2017-10-26 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220765#comment-16220765
 ] 

Till Rohrmann commented on FLINK-7100:
--

True, the PR does not address a single problem but multiple. The problem was 
that while trying to fix how the {{TaskManagerGroup}} is registered I stumbled 
across so many other problems which needed to be addressed due to the initial 
changes. E.g. the lifecycle management of the {{MetricRegistry}} was quite 
spread across many components. Still I think that this PR fixes some valuable 
things and pulls others straight.

> TaskManager metrics are registered twice
> 
>
> Key: FLINK-7100
> URL: https://issues.apache.org/jira/browse/FLINK-7100
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Metrics
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.4.0, 1.3.3
>
>
> TaskManager metrics are currently registered twice, once when the TaskManager 
> is started and once when the TaskManager associates with a JobManager.
> Originally the metrics were registered when the TM associates with the JM and 
> unregistered upon disassociation.
> 9e9776f17ed18b12af177e31ab0bc266236f85ef modified the {{TaskManager}} to use 
> the {{TaskManagerServices}}, which when loaded _also_ register the metrics.
> I suggest to remove the registrations that happen upon (dis-)association.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7914) Expose Akka gated interval as user option

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220763#comment-16220763
 ] 

ASF GitHub Bot commented on FLINK-7914:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4903
  
Sounds fair, +1




> Expose Akka gated interval as user option
> -
>
> Key: FLINK-7914
> URL: https://issues.apache.org/jira/browse/FLINK-7914
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Whenever Akka loses its connection to a remote {{ActorSystem}} it gates the 
> corresponding address. The default value is {{5 s}}. Especially for tests 
> this can be too high. Therefore, I propose to expose this option to the user 
> via the {{AkkaOptions}} and setting it to {{50 ms}} per default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4903: [FLINK-7914] Introduce AkkaOptions.RETRY_GATE_CLOSED_FOR

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4903
  
Sounds fair, +1




---


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220750#comment-16220750
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4844
  
Had an offline discussion with @tillrohrmann - rewriting this without 
Mockito results in a similar amount of code with similar maintenance effort, so 
seems to be okay in this case.

+1 to merge after fixing the `Thread.holdsLock(lock)` comment above


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoin...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4844
  
Had an offline discussion with @tillrohrmann - rewriting this without 
Mockito results in a similar amount of code with similar maintenance effort, so 
seems to be okay in this case.

+1 to merge after fixing the `Thread.holdsLock(lock)` comment above


---


[jira] [Updated] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7933:
--
Component/s: Tests

> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7933:
--
Labels: test-stability  (was: )

> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7933:
--
Priority: Critical  (was: Major)

> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7933:
-

 Summary: Test instability PrometheusReporterTest
 Key: FLINK-7933
 URL: https://issues.apache.org/jira/browse/FLINK-7933
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0
Reporter: Kostas Kloudas


Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7800) Enable window joins without equi-join predicates

2017-10-26 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220670#comment-16220670
 ] 

Fabian Hueske commented on FLINK-7800:
--

That's a good point. I think we can remove the restrictions in 
{{FlinkLogicalJoinConverter}}.


> Enable window joins without equi-join predicates
> 
>
> Key: FLINK-7800
> URL: https://issues.apache.org/jira/browse/FLINK-7800
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, windowed joins can only be translated if they have at least on 
> equi-join predicate. This limitation exists due to the lack of a good cross 
> join strategy for the DataSet API.
> Due to the window, windowed joins do not have to be executed as cross joins. 
> Hence, the equi-join limitation does not need to be enforces (even though 
> non-equi joins are executed with a parallelism of 1 right now).
> We can resolve this issue by adding a boolean flag to the 
> {{FlinkLogicalJoinConverter}} rule to permit non-equi joins and add such a 
> rule to the logical optimization set of the DataStream API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7932) Best Practices docs recommend passing parameters through open(Configuration c)

2017-10-26 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220666#comment-16220666
 ] 

Fabian Hueske commented on FLINK-7932:
--

I see, thanks for the info.
Might make sense to port the fix to 1.3 as well.

> Best Practices docs recommend passing parameters through open(Configuration c)
> --
>
> Key: FLINK-7932
> URL: https://issues.apache.org/jira/browse/FLINK-7932
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>
> The [Best 
> Practices|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html]
>  docs recommend to use {{Configuration}} to pass parameters to user functions.
> This does not work for DataStream programs and is not recommended anymore. 
> The "Best Practices" page should be reworked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7932) Best Practices docs recommend passing parameters through open(Configuration c)

2017-10-26 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220662#comment-16220662
 ] 

Chesnay Schepler commented on FLINK-7932:
-

This section was removed recently for 1.4 by [~aljoscha].

> Best Practices docs recommend passing parameters through open(Configuration c)
> --
>
> Key: FLINK-7932
> URL: https://issues.apache.org/jira/browse/FLINK-7932
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>
> The [Best 
> Practices|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html]
>  docs recommend to use {{Configuration}} to pass parameters to user functions.
> This does not work for DataStream programs and is not recommended anymore. 
> The "Best Practices" page should be reworked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7932) Best Practices docs recommend passing parameters through open(Configuration c)

2017-10-26 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7932:


 Summary: Best Practices docs recommend passing parameters through 
open(Configuration c)
 Key: FLINK-7932
 URL: https://issues.apache.org/jira/browse/FLINK-7932
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Fabian Hueske


The [Best 
Practices|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html]
 docs recommend to use {{Configuration}} to pass parameters to user functions.

This does not work for DataStream programs and is not recommended anymore. The 
"Best Practices" page should be reworked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220639#comment-16220639
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Merging as soon as Travis gives a green light on the rebased branch...


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Merging as soon as Travis gives a green light on the rebased branch...


---


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220633#comment-16220633
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Thanks for the review, merging...


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4827
  
Thanks for the review, merging...


---


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220629#comment-16220629
 ] 

ASF GitHub Bot commented on FLINK-7737:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4876
  
A quick post-mortem comment here:

This adds a lot of `equals()` and `hashCode()` on classes where these are 
ill-defined.

For example: `StreamWriterBase` defines `equals()` and `hashCode()` just on 
a subset of configuration fields (here the sync field) and ignore the 
associated stream, because equals and hash is ill-defined on the stream. To me, 
the correct conclusion is that `equals()` and `hashCode()` are ill-defined on 
`StreamWriterBase` and should not be there!

Adding such methods just to make assertion statement in tests more compact 
wrongly pushes some specific test logic in to the main classes. The correct way 
is to adjust the assertions in the test, or, if there is a lot of repetitive 
checking, create a `Matcher` that matches "equality based on some fields" and 
replace `assertEquals(X, Y)` with `assertThat(matcher, X, Y)`.

I think in this case, it is actually a reason for a follow-up patch that 
changes this.


> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWr...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4876
  
A quick post-mortem comment here:

This adds a lot of `equals()` and `hashCode()` on classes where these are 
ill-defined.

For example: `StreamWriterBase` defines `equals()` and `hashCode()` just on 
a subset of configuration fields (here the sync field) and ignore the 
associated stream, because equals and hash is ill-defined on the stream. To me, 
the correct conclusion is that `equals()` and `hashCode()` are ill-defined on 
`StreamWriterBase` and should not be there!

Adding such methods just to make assertion statement in tests more compact 
wrongly pushes some specific test logic in to the main classes. The correct way 
is to adjust the assertions in the test, or, if there is a lot of repetitive 
checking, create a `Matcher` that matches "equality based on some fields" and 
replace `assertEquals(X, Y)` with `assertThat(matcher, X, Y)`.

I think in this case, it is actually a reason for a follow-up patch that 
changes this.


---


[jira] [Assigned] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-10-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7153:


Assignee: Till Rohrmann

> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220560#comment-16220560
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r147165565
  
--- Diff: flink-test-utils-parent/flink-test-utils/pom.xml ---
@@ -117,6 +124,41 @@ under the License.
true
true

+
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
io.netty:netty
+   
+   
+   
+   
+   
org.jboss.netty
+   
org.apache.flink.shaded.testutils.org.jboss.netty
--- End diff --

Good question, maybe @pnowojski can answer that... My guess is he was more 
familiar with the Netty 3 API (org.jboss.netty) and our shaded netty is Netty 4 
(io.netty).


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r147165565
  
--- Diff: flink-test-utils-parent/flink-test-utils/pom.xml ---
@@ -117,6 +124,41 @@ under the License.
true
true

+
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
io.netty:netty
+   
+   
+   
+   
+   
org.jboss.netty
+   
org.apache.flink.shaded.testutils.org.jboss.netty
--- End diff --

Good question, maybe @pnowojski can answer that... My guess is he was more 
familiar with the Netty 3 API (org.jboss.netty) and our shaded netty is Netty 4 
(io.netty).


---


[jira] [Commented] (FLINK-7846) Remove guava shading from ES2 connector

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220556#comment-16220556
 ] 

ASF GitHub Bot commented on FLINK-7846:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4902


> Remove guava shading from ES2 connector
> ---
>
> Key: FLINK-7846
> URL: https://issues.apache.org/jira/browse/FLINK-7846
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>
> The ElasticSearch 2 connector pom has a shading configuration for guava. The 
> only user of guava is the elasticsearch dependency, which is not included in 
> the jar, making the shading pointless.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4902: [FLINK-7846] [elasticsearch] Remove unnecessary gu...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4902


---


[jira] [Closed] (FLINK-7846) Remove guava shading from ES2 connector

2017-10-26 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7846.
---
Resolution: Fixed

1.4: dc1a0dce060c7a4af9fec3a5d3d0ccf564e08060

> Remove guava shading from ES2 connector
> ---
>
> Key: FLINK-7846
> URL: https://issues.apache.org/jira/browse/FLINK-7846
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>
> The ElasticSearch 2 connector pom has a shading configuration for guava. The 
> only user of guava is the elasticsearch dependency, which is not included in 
> the jar, making the shading pointless.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7368) MetricStore makes cpu spin at 100%

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220553#comment-16220553
 ] 

ASF GitHub Bot commented on FLINK-7368:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4840


> MetricStore makes cpu spin at 100%
> --
>
> Key: FLINK-7368
> URL: https://issues.apache.org/jira/browse/FLINK-7368
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Nico Chen
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, 
> jm-jstack.log
>
>
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 
> runnable [0x7fbd7d1c2000]
>java.lang.Thread.State: RUNNABLE
> at java.util.HashMap.put(HashMap.java:494)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
> at akka.dispatch.OnSuccess.internal(Future.scala:212)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
> at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
> at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7368) MetricStore makes cpu spin at 100%

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220554#comment-16220554
 ] 

ASF GitHub Bot commented on FLINK-7368:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4472


> MetricStore makes cpu spin at 100%
> --
>
> Key: FLINK-7368
> URL: https://issues.apache.org/jira/browse/FLINK-7368
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Nico Chen
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, 
> jm-jstack.log
>
>
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 
> runnable [0x7fbd7d1c2000]
>java.lang.Thread.State: RUNNABLE
> at java.util.HashMap.put(HashMap.java:494)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
> at akka.dispatch.OnSuccess.internal(Future.scala:212)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
> at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
> at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4840


---


[GitHub] flink pull request #4472: FLINK-7368: MetricStore makes cpu spin at 100%

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4472


---


[jira] [Closed] (FLINK-7368) MetricStore makes cpu spin at 100%

2017-10-26 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7368.
---
Resolution: Fixed

1.4: f622de3ecbc2ae17f2d15fe46828c48747c2b6ae

> MetricStore makes cpu spin at 100%
> --
>
> Key: FLINK-7368
> URL: https://issues.apache.org/jira/browse/FLINK-7368
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Nico Chen
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, 
> jm-jstack.log
>
>
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 
> runnable [0x7fbd7d1c2000]
>java.lang.Thread.State: RUNNABLE
> at java.util.HashMap.put(HashMap.java:494)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
> at akka.dispatch.OnSuccess.internal(Future.scala:212)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
> at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
> at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220545#comment-16220545
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4844#discussion_r147162652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1270,6 +1272,42 @@ public void run() {
}
 
/**
+* Discards the given pending checkpoint because of the given cause.
+*
+* @param pendingCheckpoint to discard
+* @param cause for discarding the checkpoint
+*/
+   private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, 
@Nullable Throwable cause) {
+   Thread.holdsLock(lock);
--- End diff --

Should that be an `assert(Thread.holdsLock(lock));` or a 
`Preconditions.checkState(Thread.holdsLock(lock));`?


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending ch...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4844#discussion_r147162652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1270,6 +1272,42 @@ public void run() {
}
 
/**
+* Discards the given pending checkpoint because of the given cause.
+*
+* @param pendingCheckpoint to discard
+* @param cause for discarding the checkpoint
+*/
+   private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, 
@Nullable Throwable cause) {
+   Thread.holdsLock(lock);
--- End diff --

Should that be an `assert(Thread.holdsLock(lock));` or a 
`Preconditions.checkState(Thread.holdsLock(lock));`?


---


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220519#comment-16220519
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4873#discussion_r147096750
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -58,14 +62,17 @@ class AggSqlFunction(
 // will be generated when translating the calcite relnode to flink 
runtime execution plan
 null,
 false,
-requiresOver
+requiresOver,
+typeFactory
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
 
   override def isDeterministic: Boolean = aggregateFunction.isDeterministic
 
   override def toString: String = displayName
+
+  override def getParamTypes: util.List[RelDataType] = 
Collections.emptyList()
--- End diff --

I checked the Calcite code. There is a method that accesses the first 
element of the list if it is not `null` (no length >= 1 check). Hence, an empty 
list would result in an exception. I'd rather return `null` here.


> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220522#comment-16220522
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4873#discussion_r147154899
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
 ---
@@ -208,29 +208,29 @@ class AggregateTest extends TableTestBase {
 val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable 
" +
   "GROUP BY GROUPING SETS (b, c)"
 
-val aggregate = unaryNode(
-  "DataSetCalc",
-  binaryNode(
-"DataSetUnion",
+val aggregate = binaryNode(
--- End diff --

The Grouping Set tests in this file are identical to those in 
`GroupingSetsTest` and can be therefore removed.


> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220521#comment-16220521
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4873#discussion_r147142386
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/DecomposeGroupingSetRule.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+class DecomposeGroupingSetRule
+  extends RelOptRule(
+operand(classOf[LogicalAggregate], any),
+  "DecomposeGroupingSetRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.getGroupSets.isEmpty &&
+  
DecomposeGroupingSetRule.getGroupIdExprIndexes(agg.getAggCallList).nonEmpty
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+val groupIdExprs = 
DecomposeGroupingSetRule.getGroupIdExprIndexes(agg.getAggCallList).toSet
+
+val subAggs = agg.groupSets.map(set =>
+  DecomposeGroupingSetRule.decompose(call.builder(), agg, 
groupIdExprs, set))
+
+val union = subAggs.reduce((l, r) => new LogicalUnion(
+  agg.getCluster,
+  agg.getTraitSet,
+  Seq(l, r),
+  true
+))
+call.transformTo(union)
+  }
+}
+
+object DecomposeGroupingSetRule {
+  val INSTANCE = new DecomposeGroupingSetRule
+
+  private def getGroupIdExprIndexes(aggCalls: Seq[AggregateCall]) = {
+aggCalls.zipWithIndex.filter { case (call, _) =>
+call.getAggregation.getKind match {
+  case SqlKind.GROUP_ID | SqlKind.GROUPING | SqlKind.GROUPING_ID =>
+true
+  case _ =>
+false
+}
+}.map { case (_, idx) => idx}
+  }
+
+  private def decompose(
+ relBuilder: RelBuilder,
+ agg: LogicalAggregate,
+ groupExprIndexes : Set[Int],
+ groupSet: ImmutableBitSet
+  ) = {
+val aggsWithIndexes = agg.getAggCallList.zipWithIndex
+val subAgg = new LogicalAggregate(
+  agg.getCluster,
+  agg.getTraitSet,
+  agg.getInput,
+  false,
+  groupSet,
+  Seq(),
+  aggsWithIndexes
+.filter { case (_, idx) => !groupExprIndexes.contains(idx) }
+.map { case (call, _) => call}
+)
+
+val rexBuilder = relBuilder.getRexBuilder
+relBuilder.push(subAgg)
+
+val groupingFields = new Array[RexNode](agg.getGroupCount)
+val groupingFieldsName = Seq.range(0, agg.getGroupCount).map(
+  x => agg.getRowType.getFieldNames.get(x)
+)
+Seq.range(0, agg.getGroupCount).foreach(x =>
+  groupingFields(x) = rexBuilder.makeNullLiteral(
+agg.getRowType.getFieldList.get(x).getType)
+)
+
+groupSet.toList.zipWithIndex.foreach { case (group, idx) =>
+  groupingFields(group) = rexBuilder.makeInputRef(relBuilder.peek(), 
idx)
+}
+
+val aggFields = aggsWithIndexes.map { case (call, idx) =>
+  if (groupExprIndexes.contains(idx)) {
+lowerGroupExpr(agg.getCluster, call, groupSet)
+  } else {
+rexBuilder.makeInput

[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220520#comment-16220520
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4873#discussion_r147120015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -849,17 +848,7 @@ object AggregateUtil {
   outputType
 )
 
-val constantFlags: Option[Array[(Int, Boolean)]] =
-if (inGroupingSet) {
-
-  val groupingSetsMapping = getGroupingSetsIndicatorMapping(inputType, 
outputType)
--- End diff --

`getGroupingSetsIndicatorMapping()` method can be removed as well 
everything related to the `constantFlags` parameter in 
`AggregationCodeGenerator`, `GeneratedAggregations`, 
`DawtaSetFinalAggFunction`, etc.


> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4873: [FLINK-7051] [table] Bump Calcite version to 1.14.

2017-10-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4873#discussion_r147154899
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
 ---
@@ -208,29 +208,29 @@ class AggregateTest extends TableTestBase {
 val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable 
" +
   "GROUP BY GROUPING SETS (b, c)"
 
-val aggregate = unaryNode(
-  "DataSetCalc",
-  binaryNode(
-"DataSetUnion",
+val aggregate = binaryNode(
--- End diff --

The Grouping Set tests in this file are identical to those in 
`GroupingSetsTest` and can be therefore removed.


---


  1   2   3   >