[jira] [Created] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator

2018-05-29 Thread aitozi (JIRA)
aitozi created FLINK-9476:
-

 Summary: Lost sideOutPut Late Elements in CEP Operator
 Key: FLINK-9476
 URL: https://issues.apache.org/jira/browse/FLINK-9476
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.4.2
Reporter: aitozi
Assignee: aitozi






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9443) Remove unused parameter in StreamGraphHasherV2

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494721#comment-16494721
 ] 

ASF GitHub Bot commented on FLINK-9443:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6080
  
 @aljoscha  please help review this ,thanks


> Remove unused parameter in StreamGraphHasherV2 
> ---
>
> Key: FLINK-9443
> URL: https://issues.apache.org/jira/browse/FLINK-9443
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.2.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method 
> generateNodeLocalHash dont use the information like (parallel, userFunction) 
> now, so the parameter should be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalH...

2018-05-29 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6080
  
 @aljoscha  please help review this ,thanks


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494712#comment-16494712
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191647381
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --


![image](https://user-images.githubusercontent.com/20113411/4070-3b47089a-640f-11e8-8cd3-bc5684c07228.png)



> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191647381
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --


![image](https://user-images.githubusercontent.com/20113411/4070-3b47089a-640f-11e8-8cd3-bc5684c07228.png)



---


[jira] [Created] (FLINK-9475) introduce an approximate version of "select distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9475:
-

 Summary: introduce an approximate version of "select distinct"
 Key: FLINK-9475
 URL: https://issues.apache.org/jira/browse/FLINK-9475
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Base on the "Elastic Bloom Filter", it easy to implement an approximate version 
of "select distinct" that have an excellent performance. Its accuracy should be 
configurable, e.g. 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9474) Introduce an approximate version of "count distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9474:
-

 Summary: Introduce an approximate version of "count distinct"
 Key: FLINK-9474
 URL: https://issues.apache.org/jira/browse/FLINK-9474
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We can implement an approximate version of "count distinct" base on the 
"Elastic Bloom Filter", It could be very fast because we don't need to query 
the state anymore, its accuracy should could be configurable. e.g 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9473) Compilation fails after upgrade to Calcite 1.17

2018-05-29 Thread Sergey Nuyanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-9473:
---
Description: 

{noformat}
/apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40:
 error: class ExternalCatalogSchema needs to be abstract, since:
[ERROR] it has 2 unimplemented members.
[ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures 
are as follows.
[ERROR]  *  For convenience, these are usable as stub implementations.
[ERROR]  */
[ERROR]   def getType(x$1: String): 
org.apache.calcite.rel.type.RelProtoDataType = ???
[ERROR]   def getTypeNames(): java.util.Set[String] = ???
[ERROR] 
[ERROR] class ExternalCatalogSchema(
[ERROR]   ^
[WARNING] two warnings found
[ERROR] one error found

{noformat}

while https://issues.apache.org/jira/browse/CALCITE-2045 into interface 
_org.apache.calcite.schema.Schema_ there were introduced two more methods: 
_org.apache.calcite.schema.Schema#getTypeNames_, 
_org.apache.calcite.schema.Schema#getType_

  was:
{noformat}
/apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40:
 error: class ExternalCatalogSchema needs to be abstract, since:
[ERROR] it has 2 unimplemented members.
[ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures 
are as follows.
[ERROR]  *  For convenience, these are usable as stub implementations.
[ERROR]  */
[ERROR]   def getType(x$1: String): 
org.apache.calcite.rel.type.RelProtoDataType = ???
[ERROR]   def getTypeNames(): java.util.Set[String] = ???
[ERROR] 
[ERROR] class ExternalCatalogSchema(
[ERROR]   ^
[WARNING] two warnings found
[ERROR] one error found

{noformat}


> Compilation fails after upgrade to Calcite 1.17
> ---
>
> Key: FLINK-9473
> URL: https://issues.apache.org/jira/browse/FLINK-9473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> {noformat}
> /apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40:
>  error: class ExternalCatalogSchema needs to be abstract, since:
> [ERROR] it has 2 unimplemented members.
> [ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures 
> are as follows.
> [ERROR]  *  For convenience, these are usable as stub implementations.
> [ERROR]  */
> [ERROR]   def getType(x$1: String): 
> org.apache.calcite.rel.type.RelProtoDataType = ???
> [ERROR]   def getTypeNames(): java.util.Set[String] = ???
> [ERROR] 
> [ERROR] class ExternalCatalogSchema(
> [ERROR]   ^
> [WARNING] two warnings found
> [ERROR] one error found
> {noformat}
> while https://issues.apache.org/jira/browse/CALCITE-2045 into interface 
> _org.apache.calcite.schema.Schema_ there were introduced two more methods: 
> _org.apache.calcite.schema.Schema#getTypeNames_, 
> _org.apache.calcite.schema.Schema#getType_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9473) Compilation fails after upgrade to Calcite 1.17

2018-05-29 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9473:
--

 Summary: Compilation fails after upgrade to Calcite 1.17
 Key: FLINK-9473
 URL: https://issues.apache.org/jira/browse/FLINK-9473
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


{noformat}
/apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40:
 error: class ExternalCatalogSchema needs to be abstract, since:
[ERROR] it has 2 unimplemented members.
[ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures 
are as follows.
[ERROR]  *  For convenience, these are usable as stub implementations.
[ERROR]  */
[ERROR]   def getType(x$1: String): 
org.apache.calcite.rel.type.RelProtoDataType = ???
[ERROR]   def getTypeNames(): java.util.Set[String] = ???
[ERROR] 
[ERROR] class ExternalCatalogSchema(
[ERROR]   ^
[WARNING] two warnings found
[ERROR] one error found

{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494682#comment-16494682
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol updated the PR as suggested! Please review


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-29 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol updated the PR as suggested! Please review


---


[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink

2018-05-29 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9364:
--
Summary: Add doc of the memory usage in flink  (was: Add doc for the memory 
usage in flink)

> Add doc of the memory usage in flink
> 
>
> Key: FLINK-9364
> URL: https://issues.apache.org/jira/browse/FLINK-9364
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> We need to add a doc to describe the memory usage in flink, especially when 
> people use the RocksDBBackend, many people get confuse because of that (I've 
> saw serval question related to this on the user emails).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494674#comment-16494674
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I've addressed your comments, could you please have a look 
again?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494676#comment-16494676
 ] 

ASF GitHub Bot commented on FLINK-9468:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6094
  
CC @StephanEwen 


> get outputLimit of LimitedConnectionsFileSystem incorrectly
> ---
>
> Key: FLINK-9468
> URL: https://issues.apache.org/jira/browse/FLINK-9468
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit 
> incorrectly.
> {code:java}
> private  T createStream(
>   final SupplierWithException streamOpener,
>   final HashSet openStreams,
>   final boolean output) throws IOException {
> final int outputLimit = output && maxNumOpenInputStreams > 0 ? 
> maxNumOpenOutputStreams : Integer.MAX_VALUE;
> /**/
> }
> {code}
> should be 
> {code:java}
> private  T createStream(
>   final SupplierWithException streamOpener,
>   final HashSet openStreams,
>   final boolean output) throws IOException {
> final int outputLimit = output && maxNumOpenOutputStreams > 0 ? 
> maxNumOpenOutputStreams : Integer.MAX_VALUE;
> /**/
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-29 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I've addressed your comments, could you please have a look 
again?


---


[GitHub] flink issue #6094: [FLINK-9468][filesystem] fix calculate outputLimit incorr...

2018-05-29 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6094
  
CC @StephanEwen 


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494606#comment-16494606
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191628800
  
--- Diff: docs/monitoring/metrics.md ---
@@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric 
types as follows:
 
 All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway 
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
--- End diff --

thanks for review, I will improve the doc


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191628800
  
--- Diff: docs/monitoring/metrics.md ---
@@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric 
types as follows:
 
 All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway 
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
--- End diff --

thanks for review, I will improve the doc


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494605#comment-16494605
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191628488
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

:+1: right
it's better to use JobManager ID / TaskManager ID to compose the jobName, 
then jobName is `JM ID` / `TM ID`  
or combined with the specified prefix like `prefix + JM ID` / `prefix + TM 
ID`

but for now,  JM IDs are currently not exposed, so use random strings 
instead of JM/TM ID


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191628488
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

:+1: right
it's better to use JobManager ID / TaskManager ID to compose the jobName, 
then jobName is `JM ID` / `TM ID`  
or combined with the specified prefix like `prefix + JM ID` / `prefix + TM 
ID`

but for now,  JM IDs are currently not exposed, so use random strings 
instead of JM/TM ID


---


[jira] [Closed] (FLINK-9460) Redundant output in table & upsert semantics

2018-05-29 Thread zhengcanbin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengcanbin closed FLINK-9460.
--
Resolution: Not A Problem

> Redundant output in table & upsert semantics
> 
>
> Key: FLINK-9460
> URL: https://issues.apache.org/jira/browse/FLINK-9460
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: zhengcanbin
>Priority: Minor
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: image-2018-05-29-11-39-45-698.png, 
> image-2018-05-29-11-51-20-671.png
>
>
> The output seems incorrect in my table & upsert example, here's the code:
> {code:java}
> object VerifyUpsert {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.createLocalEnvironment()
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setParallelism(1)
> val input = env.socketTextStream("localhost", 9099).map { x =>
>   val tokens = x.split(",")
>   DemoSource(tokens(0), tokens(1), tokens(2))
> }
> tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 
> 'page_id)
> val fieldNames = Array("record_time", "pv", "uv")
> val fieldTypes = Array(Types.STRING, Types.LONG, 
> Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
> tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, 
> MyPrintSink(fieldNames, fieldTypes))
> tEnv.sqlUpdate(
>   """
> |INSERT INTO demoSink
> |SELECT
> |  SUBSTRING(record_time, 1, 16) as record_time,
> |  count(user_id) as pv,
> |  count(DISTINCT user_id) as uv
> |FROM demoSource
> |GROUP BY SUBSTRING(record_time, 1, 16)
>   """.stripMargin)
> env.execute()
>   }
>   case class DemoSource(record_time: String, user_id: String, page_id: String)
> }
> case class MyPrintSink(var fNames: Array[String], var fTypes: 
> Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {
>   override def setKeyFields(keys: Array[String]): Unit = Seq.empty
>   override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}
>   override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
> fNames)
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new 
> PrintSinkFunction())
>   override def getFieldNames: Array[String] = fNames
>   override def getFieldTypes: Array[TypeInformation[_]] = fTypes
>   override def configure(fieldNames: Array[String], fieldTypes: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
> val copy = MyPrintSink(fNames, fTypes)
> copy.fNames = fieldNames
> copy.fTypes = fieldTypes
> copy
>   }
> }{code}
> when application starts, I type in netcat client one record a time,  below 
> table shows outputs for every input record:
>  
> ||input||output||
> |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
> |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
> |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
> |2018-05-24 21:34:12,0,4|{color:#ff}(true,2018-05-24 21:34,2,2){color}
>  (true,2018-05-24 21:34,4,3)|
>  
> when the forth record is consumed, two output records would be printed in 
> sink, obviously the first one record with red color is redundant. I followed 
> the source code and found something wrong with 
>  
> {code:java}
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
> {code}
> !image-2018-05-29-11-51-20-671.png!
> I think when (!generateRetraction) && !inputC.change is true, we should not 
> invoke out.collect here.
>  
> [~StephanEwen] pls look over this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494511#comment-16494511
 ] 

ASF GitHub Bot commented on FLINK-9472:
---

GitHub user alexeyts reopened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix, pull-request-available
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo

2018-05-29 Thread alexeyts
GitHub user alexeyts reopened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




---


[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494510#comment-16494510
 ] 

ASF GitHub Bot commented on FLINK-9472:
---

Github user alexeyts closed the pull request at:

https://github.com/apache/flink/pull/6098


> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix, pull-request-available
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo

2018-05-29 Thread alexeyts
Github user alexeyts closed the pull request at:

https://github.com/apache/flink/pull/6098


---


[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494350#comment-16494350
 ] 

ASF GitHub Bot commented on FLINK-9472:
---

GitHub user alexeyts reopened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix, pull-request-available
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494349#comment-16494349
 ] 

ASF GitHub Bot commented on FLINK-9472:
---

Github user alexeyts closed the pull request at:

https://github.com/apache/flink/pull/6098


> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix, pull-request-available
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo

2018-05-29 Thread alexeyts
GitHub user alexeyts reopened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




---


[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo

2018-05-29 Thread alexeyts
Github user alexeyts closed the pull request at:

https://github.com/apache/flink/pull/6098


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494297#comment-16494297
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191578999
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

It is very possible that I'm overthinking this, and I've come up with a 
compromise.

If there is one thing we learned in regards to the metric system it is that 
users despise random IDs, especially so if they can't connect them with 
anything else. The random ID that you're suggesting is exactly that; a random 
piece of data, that effectively is just a workaround for the questionable 
design of the PushGateway.
For the sake of analyzing metrics this ID is irrelevant, it just eats up 
space.
The randomness is especially problematic since this ID is used for deleting 
metrics (which one has to do at some point), making this arbitrary value 
_really_ important.

For our intents however we just need _unique_ value for each container, 
i.e. dispatcher/taskmanager etc., not necessarily random .
Every distributed component already has such an ID, most notable the 
TaskManager ID that is already exposed to the metric system. JobManager IDs are 
currently not exposed, but it was only a matter of time until this becomes 
necessary.
While technically still a random value it at least does not an an entirely 
not label/dimension, but merely copies an existing one.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191578999
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

It is very possible that I'm overthinking this, and I've come up with a 
compromise.

If there is one thing we learned in regards to the metric system it is that 
users despise random IDs, especially so if they can't connect them with 
anything else. The random ID that you're suggesting is exactly that; a random 
piece of data, that effectively is just a workaround for the questionable 
design of the PushGateway.
For the sake of analyzing metrics this ID is irrelevant, it just eats up 
space.
The randomness is especially problematic since this ID is used for deleting 
metrics (which one has to do at some point), making this arbitrary value 
_really_ important.

For our intents however we just need _unique_ value for each container, 
i.e. dispatcher/taskmanager etc., not necessarily random .
Every distributed component already has such an ID, most notable the 
TaskManager ID that is already exposed to the metric system. JobManager IDs are 
currently not exposed, but it was only a matter of time until this becomes 
necessary.
While technically still a random value it at least does not an an entirely 
not label/dimension, but merely copies an existing one.


---


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191576816
  
--- Diff: docs/monitoring/metrics.md ---
@@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric 
types as follows:
 
 All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway 
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
--- End diff --

Please add a section highlighting the differences and use-cases compared to 
the existing reporter.

In particular we should mention that this reporter, like the existing 
reporter, is not suited for short-lived jobs.


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494284#comment-16494284
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191576816
  
--- Diff: docs/monitoring/metrics.md ---
@@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric 
types as follows:
 
 All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway 
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
--- End diff --

Please add a section highlighting the differences and use-cases compared to 
the existing reporter.

In particular we should mention that this reporter, like the existing 
reporter, is not suited for short-lived jobs.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread Alexey Tsitkin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Tsitkin updated FLINK-9472:
--
Labels: documentation easyfix pull-request-available  (was: documentation 
easyfix)

> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix, pull-request-available
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494231#comment-16494231
 ] 

ASF GitHub Bot commented on FLINK-9472:
---

GitHub user alexeyts opened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo

2018-05-29 Thread alexeyts
GitHub user alexeyts opened a pull request:

https://github.com/apache/flink/pull/6098

[FLINK-9472] fix archetype documentation typo

## What is the purpose of the change

Simple typo fix

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexeyts/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6098


commit 023670b70d2eb7d79d62c189af5f931f0f27af04
Author: Alexey Tsitkin 
Date:   2018-05-29T18:42:53Z

[FLINK-9472] fix archetype documentation typo




---


[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread Alexey Tsitkin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Tsitkin updated FLINK-9472:
--
Labels: documentation easyfix  (was: )

> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>  Labels: documentation, easyfix
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread Alexey Tsitkin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Tsitkin updated FLINK-9472:
--
Component/s: Documentation

> Maven archetype code has typo in documentation
> --
>
> Key: FLINK-9472
> URL: https://issues.apache.org/jira/browse/FLINK-9472
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Alexey Tsitkin
>Priority: Trivial
>
> The word application is misspelled (as `appliation`) in java/scala code 
> documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9472) Maven archetype code has typo in documentation

2018-05-29 Thread Alexey Tsitkin (JIRA)
Alexey Tsitkin created FLINK-9472:
-

 Summary: Maven archetype code has typo in documentation
 Key: FLINK-9472
 URL: https://issues.apache.org/jira/browse/FLINK-9472
 Project: Flink
  Issue Type: Bug
Reporter: Alexey Tsitkin


The word application is misspelled (as `appliation`) in java/scala code 
documentation in the maven archetype.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494193#comment-16494193
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191561341
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

eh forget it, that's not a viable option for containerized environments 
that this issue targets anyway...


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191561341
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

eh forget it, that's not a viable option for containerized environments 
that this issue targets anyway...


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494189#comment-16494189
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191560994
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

What would happen if every taskmanager/jobmanager has it's own pushgateway?


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191560994
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

What would happen if every taskmanager/jobmanager has it's own pushgateway?


---


[jira] [Commented] (FLINK-5550) NotFoundException: Could not find job with id

2018-05-29 Thread Alexandr Arkhipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494118#comment-16494118
 ] 

Alexandr Arkhipov commented on FLINK-5550:
--

Would you mind if I take this issue and work on it?

> NotFoundException: Could not find job with id
> -
>
> Key: FLINK-5550
> URL: https://issues.apache.org/jira/browse/FLINK-5550
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.0.3
> Environment: centos
>Reporter: jiwengang
>Priority: Minor
>  Labels: newbie
>
> Job is canceled, but still report the following exception:
> 2017-01-18 10:35:18,677 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
> handling request
> org.apache.flink.runtime.webmonitor.NotFoundException: Could not find job 
> with id 3b98e734c868cc2b992743cfe8911ad0
> at 
> org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleRequest(AbstractExecutionGraphRequestHandler.java:58)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
> at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:104)
> at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9471) Job ending exceptions being logged at Info level

2018-05-29 Thread SUBRAMANYA SURESH (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SUBRAMANYA SURESH updated FLINK-9471:
-
Description: 
We are using Flink SQL, I see job ending logs that are logged at info level, 
that makes it very hard for me to tune out the Info messages in the 
configuration. Note: If I do end up using Info, the same executionGraph logs 
the entire query for the operationGraph for every info statement, and this 
fills up the logs easily if we have say 100-200 queries. 

Note the "-" below indicate an entire line of execution graph for this query 
(redacted for privacy). 

018-03-30 03:32:09,942 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> - 

- 

-

-

- (208/725).}

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator 
Source: Custom Source -> (- 

-

-

-

-

) (208/725).

        ... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/067924e4-c861-4de1-823e-b255a0bf9998
 in order to obtain the stream state handle

        at java.util.concurrent.FutureTask.report(FutureTask.java:122)

        at java.util.concurrent.FutureTask.get(FutureTask.java:192)

        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892)

 

  was:
We are using Flink SQL, I see job ending logs that are logged at info level, 
that makes it very hard for me to tune out the Info messages in the 
configuration. Note: If I do end up using Info, the same executionGraph logs 
the entire query for the operationGraph for every info statement, and this 
fills up the logs easily if we have say 100-200 queries. 

Note the "-" below indicate an entire line of execution graph for this query 
(redacted for privacy). 



2018-03-30 03:32:09,943 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 because: java.lang.Exception: Could not materialize checkpoint 1 
for operator Source: Custom Source -> (Map -> where: (AND(=- 

- 

- 

- 

-.}

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator 
Source: Custom Source -> (Map -> where: (AND(=(Environment, 
_UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, 
_UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, 
CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, 
_raw, Aggregator), Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), 
=(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, 
_UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, 
CollectedTimestamp, EventTimestamp, _raw, Aggregator)) (353/725).

... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/31b94717-9e6d-49b8-b64d-2a1a8ba04425
 in order to obtain the stream state handle

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892)

... 5 more

Suppressed: java.lang.Exception: Could not properly cancel managed operator 
state future.

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)

at 

[jira] [Commented] (FLINK-6977) Add MD5/SHA1/SHA2 supported in TableAPI

2018-05-29 Thread Alexandr Arkhipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494097#comment-16494097
 ] 

Alexandr Arkhipov commented on FLINK-6977:
--

Is there someone working on this? If not, would you mind if I take this and fix?

> Add MD5/SHA1/SHA2 supported in TableAPI
> ---
>
> Key: FLINK-6977
> URL: https://issues.apache.org/jira/browse/FLINK-6977
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6926 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9471) Job ending exceptions being logged at Info level

2018-05-29 Thread SUBRAMANYA SURESH (JIRA)
SUBRAMANYA SURESH created FLINK-9471:


 Summary: Job ending exceptions being logged at Info level
 Key: FLINK-9471
 URL: https://issues.apache.org/jira/browse/FLINK-9471
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2
Reporter: SUBRAMANYA SURESH


We are using Flink SQL, I see job ending logs that are logged at info level, 
that makes it very hard for me to tune out the Info messages in the 
configuration. Note: If I do end up using Info, the same executionGraph logs 
the entire query for the operationGraph for every info statement, and this 
fills up the logs easily if we have say 100-200 queries. 

Note the "-" below indicate an entire line of execution graph for this query 
(redacted for privacy). 



2018-03-30 03:32:09,943 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 because: java.lang.Exception: Could not materialize checkpoint 1 
for operator Source: Custom Source -> (Map -> where: (AND(=- 

- 

- 

- 

-.}

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator 
Source: Custom Source -> (Map -> where: (AND(=(Environment, 
_UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, 
_UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, 
CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, 
_raw, Aggregator), Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), 
=(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, 
_UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, 
CollectedTimestamp, EventTimestamp, _raw, Aggregator)) (353/725).

... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/31b94717-9e6d-49b8-b64d-2a1a8ba04425
 in order to obtain the stream state handle

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892)

... 5 more

Suppressed: java.lang.Exception: Could not properly cancel managed operator 
state future.

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)

... 5 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494025#comment-16494025
 ] 

ASF GitHub Bot commented on FLINK-9470:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1 Nice!


> Allow querying the key in KeyedProcessFunction
> --
>
> Key: FLINK-9470
> URL: https://issues.apache.org/jira/browse/FLINK-9470
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
> timer while {{KeyedProcessFunction.Context}} does not allow querying the key 
> of the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...

2018-05-29 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1 Nice!


---


[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493943#comment-16493943
 ] 

ASF GitHub Bot commented on FLINK-9470:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1. I might forget to add the interfaces back then, would be good to have 
them


> Allow querying the key in KeyedProcessFunction
> --
>
> Key: FLINK-9470
> URL: https://issues.apache.org/jira/browse/FLINK-9470
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
> timer while {{KeyedProcessFunction.Context}} does not allow querying the key 
> of the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...

2018-05-29 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1. I might forget to add the interfaces back then, would be good to have 
them


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493895#comment-16493895
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol sure! I will update the PR with your requested changes.



> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-29 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol sure! I will update the PR with your requested changes.



---


[jira] [Closed] (FLINK-9356) Improve error message for when queryable state not ready / reachable

2018-05-29 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-9356.
-
Resolution: Fixed

Merged on master with 0fd497a7bf00694302a71ccf4b3ddd214942a228

and on 1.5 with 7852054f7ec8b16a1e258cb6c766d36109738ebd

> Improve error message for when queryable state not ready / reachable
> 
>
> Key: FLINK-9356
> URL: https://issues.apache.org/jira/browse/FLINK-9356
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: vinoyang
>Priority: Major
>
> When executing the queryable state client and either
> a. The queryable state is not ready
> b. There is no job with the specified job-id
>  
> one sees the following exception:
>  
> {code:java}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Failed request 0.
>  Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not contact the state location oracle to retrieve the state location.
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
> Caused by: java.lang.RuntimeException: Failed request 0.
>  Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not contact the state location oracle to retrieve the state location.
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> 

[jira] [Commented] (FLINK-9356) Improve error message for when queryable state not ready / reachable

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493831#comment-16493831
 ] 

ASF GitHub Bot commented on FLINK-9356:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6028


> Improve error message for when queryable state not ready / reachable
> 
>
> Key: FLINK-9356
> URL: https://issues.apache.org/jira/browse/FLINK-9356
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: vinoyang
>Priority: Major
>
> When executing the queryable state client and either
> a. The queryable state is not ready
> b. There is no job with the specified job-id
>  
> one sees the following exception:
>  
> {code:java}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Failed request 0.
>  Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not contact the state location oracle to retrieve the state location.
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
> Caused by: java.lang.RuntimeException: Failed request 0.
>  Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not contact the state location oracle to retrieve the state location.
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> 

[GitHub] flink pull request #6028: [FLINK-9356] Improve error message for when querya...

2018-05-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6028


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493816#comment-16493816
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6089
  
Let's not mix concerns here. Re-using examples can lead to situations where 
the quickstart tests fail because an example was modified, which by all means 
shouldn't happen.
The examples should certainly be tested as well though, but that's a 
separate issue.


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-29 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6089
  
Let's not mix concerns here. Re-using examples can lead to situations where 
the quickstart tests fail because an example was modified, which by all means 
shouldn't happen.
The examples should certainly be tested as well though, but that's a 
separate issue.


---


[jira] [Reopened] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis

2018-05-29 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-9466:
-

reopen to change fixVersion

> LocalRecoveryRocksDBFullITCase failed on Travis
> ---
>
> Key: FLINK-9466
> URL: https://issues.apache.org/jira/browse/FLINK-9466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: 
> https://api.travis-ci.org/v3/job/385097117/log.txt.
> Not sure what caused the failure where the window computes a wrong result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis

2018-05-29 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-9466.
---
Resolution: Cannot Reproduce

> LocalRecoveryRocksDBFullITCase failed on Travis
> ---
>
> Key: FLINK-9466
> URL: https://issues.apache.org/jira/browse/FLINK-9466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: 
> https://api.travis-ci.org/v3/job/385097117/log.txt.
> Not sure what caused the failure where the window computes a wrong result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis

2018-05-29 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9466:

Fix Version/s: (was: 1.6.0)

> LocalRecoveryRocksDBFullITCase failed on Travis
> ---
>
> Key: FLINK-9466
> URL: https://issues.apache.org/jira/browse/FLINK-9466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: 
> https://api.travis-ci.org/v3/job/385097117/log.txt.
> Not sure what caused the failure where the window computes a wrong result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493684#comment-16493684
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
hi @tillrohrmann can this PR been merged into master branch, so that we can 
close it?


> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...

2018-05-29 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
hi @tillrohrmann can this PR been merged into master branch, so that we can 
close it?


---


[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493683#comment-16493683
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
hi @tillrohrmann can this PR been merged into master branch, so that we can 
close it?


> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...

2018-05-29 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
hi @tillrohrmann can this PR been merged into master branch, so that we can 
close it?


---


[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493668#comment-16493668
 ] 

ASF GitHub Bot commented on FLINK-9470:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460728
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
+   new KeyedProcessOperator<>(new 
KeyQueryingProcessFunction());
+
+   OneInputStreamOperatorTestHarness, 
String> testHarness =
+   new KeyedOneInputStreamOperatorTestHarness<>(operator, 
(in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO);
+
--- End diff --

same here.


> Allow querying the key in KeyedProcessFunction
> --
>
> Key: FLINK-9470
> URL: https://issues.apache.org/jira/browse/FLINK-9470
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
> timer while {{KeyedProcessFunction.Context}} does not allow querying the key 
> of the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493667#comment-16493667
 ] 

ASF GitHub Bot commented on FLINK-9470:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460316
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
--- End diff --

The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would 
recommend to go with
```
try(harness=...) {
...
}
```
And remove the explicit call to `harness.close()`. This is a nice practice 
to start enforcing in new tests as it cleans up any leaks in case of exceptions 
and stuff.


> Allow querying the key in KeyedProcessFunction
> --
>
> Key: FLINK-9470
> URL: https://issues.apache.org/jira/browse/FLINK-9470
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
> timer while {{KeyedProcessFunction.Context}} does not allow querying the key 
> of the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...

2018-05-29 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460728
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
+   new KeyedProcessOperator<>(new 
KeyQueryingProcessFunction());
+
+   OneInputStreamOperatorTestHarness, 
String> testHarness =
+   new KeyedOneInputStreamOperatorTestHarness<>(operator, 
(in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO);
+
--- End diff --

same here.


---


[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...

2018-05-29 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460316
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
--- End diff --

The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would 
recommend to go with
```
try(harness=...) {
...
}
```
And remove the explicit call to `harness.close()`. This is a nice practice 
to start enforcing in new tests as it cleans up any leaks in case of exceptions 
and stuff.


---


[jira] [Commented] (FLINK-8353) Add support for timezones

2018-05-29 Thread Weike Dong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493639#comment-16493639
 ] 

Weike Dong commented on FLINK-8353:
---

I strongly support these features, preferably there could be a way to set a 
specific timezone for a particular job, so that all the subsequent temporal 
processing could be based on that. As user's input data are often collected 
from other systems that do not follow the rules set by Flink (UTC+0), currently 
some temporal UDFs are needed to perform such transformations, which adds the 
complexity for the whole system, especially in case of watermark generation or 
output of processing time into external database, etc.

> Add support for timezones
> -
>
> Key: FLINK-8353
> URL: https://issues.apache.org/jira/browse/FLINK-8353
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> This is an umbrella issue for adding support for timezones in the Table & SQL 
> API.
> Usually companies work with different timezones simultaneously. We could add 
> support for the new time classes introduced with Java 8 and enable our scalar 
> functions to also work with those (or some custom time class implementations 
> like those from Calcite). We need a good design for this to address most of 
> the problems users face related to timestamp and timezones.
> It is up for discussion how to ship date, time, timestamp instances through 
> the cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493636#comment-16493636
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5959


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...

2018-05-29 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5959


---


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493635#comment-16493635
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191454466
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
--- End diff --

I know, was not quiet sure if this has performance implications.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...

2018-05-29 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191454466
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
--- End diff --

I know, was not quiet sure if this has performance implications.


---


[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493633#comment-16493633
 ] 

ASF GitHub Bot commented on FLINK-9470:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/6097

[FLINK-9470] Allow querying the key in KeyedProcessFunction

R: @kl0u @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-9470-keyedprocessfunction-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6097


commit 0d3cccd546e782113a843df099fd5d8fe1880933
Author: Aljoscha Krettek 
Date:   2018-05-29T14:46:19Z

[FLINK-9470] Allow querying the key in KeyedProcessFunction




> Allow querying the key in KeyedProcessFunction
> --
>
> Key: FLINK-9470
> URL: https://issues.apache.org/jira/browse/FLINK-9470
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
> timer while {{KeyedProcessFunction.Context}} does not allow querying the key 
> of the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis

2018-05-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9466.

Resolution: Fixed

Accidentally overwrote the Travis log file. Closing until seeing this issue 
again.

> LocalRecoveryRocksDBFullITCase failed on Travis
> ---
>
> Key: FLINK-9466
> URL: https://issues.apache.org/jira/browse/FLINK-9466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: 
> https://api.travis-ci.org/v3/job/385097117/log.txt.
> Not sure what caused the failure where the window computes a wrong result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8790) Improve performance for recovery from incremental checkpoint

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493631#comment-16493631
 ] 

ASF GitHub Bot commented on FLINK-8790:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5582#discussion_r191453657
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 ---
@@ -138,4 +138,12 @@ private static void writeVariableIntBytes(
value >>>= 8;
} while (value != 0);
}
+
+   public static byte[] serializeKeyGroup(int keyGroup, int 
keyGroupPrefixBytes) {
+   byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
--- End diff --

Some info about the experiment I did: 

- I set `ReadOptions::ignore_range_deletions = true` to speed up the read 
performance, because we won't read any records that belong to the key-group we 
have deleted. 
- I only call the `deleteRange()` twice, because we will at most call it 
twice in the recovery of the incremental checkpoint.


> Improve performance for recovery from incremental checkpoint
> 
>
> Key: FLINK-8790
> URL: https://issues.apache.org/jira/browse/FLINK-8790
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> When there are multi state handle to be restored, we can improve the 
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according 
> to the target key group range (via rocksdb.deleteRange()), this can help use 
> get rid of the `key group check` in 
>  `data insertion loop` and also help us get rid of traversing the useless 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...

2018-05-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/6097

[FLINK-9470] Allow querying the key in KeyedProcessFunction

R: @kl0u @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-9470-keyedprocessfunction-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6097


commit 0d3cccd546e782113a843df099fd5d8fe1880933
Author: Aljoscha Krettek 
Date:   2018-05-29T14:46:19Z

[FLINK-9470] Allow querying the key in KeyedProcessFunction




---


[GitHub] flink pull request #5582: [FLINK-8790][State] Improve performance for recove...

2018-05-29 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5582#discussion_r191453657
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 ---
@@ -138,4 +138,12 @@ private static void writeVariableIntBytes(
value >>>= 8;
} while (value != 0);
}
+
+   public static byte[] serializeKeyGroup(int keyGroup, int 
keyGroupPrefixBytes) {
+   byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
--- End diff --

Some info about the experiment I did: 

- I set `ReadOptions::ignore_range_deletions = true` to speed up the read 
performance, because we won't read any records that belong to the key-group we 
have deleted. 
- I only call the `deleteRange()` twice, because we will at most call it 
twice in the recovery of the incremental checkpoint.


---


[jira] [Created] (FLINK-9470) Allow querying the key in KeyedProcessFunction

2018-05-29 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-9470:
---

 Summary: Allow querying the key in KeyedProcessFunction
 Key: FLINK-9470
 URL: https://issues.apache.org/jira/browse/FLINK-9470
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.6.0


{{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing 
timer while {{KeyedProcessFunction.Context}} does not allow querying the key of 
the event we're currently processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493615#comment-16493615
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191447784
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
+
+   private static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 1);
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   int count) {
+   insertRandomTimers(timerPriorityQueue, null, count);
+   }
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   Set> checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   for (int i = 0; i < count; ++i) {
+   TimerHeapInternalTimer timer =
+   new 
TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE);
+   if (checkSet != null) {
+   Preconditions.checkState(checkSet.add(timer));
+   }
+   Assert.assertTrue(timerPriorityQueue.add(timer));
+   }
+   }
+
+   private static InternalTimerHeap 
newPriorityQueue(int initialCapacity) {
+   return new InternalTimerHeap<>(
+   initialCapacity,
+   KEY_GROUP_RANGE,
+   KEY_GROUP_RANGE.getNumberOfKeyGroups());
+   }
+
+   @Test
+   public void testCombined() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalTimerHeap timerPriorityQueue = 
newPriorityQueue(initialCapacity);
+   HashSet> 
checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(timerPriorityQueue, checkSet, testSize);
+
+   long lastTimestamp = Long.MIN_VALUE;
+   int lastSize = timerPriorityQueue.size();
+   Assert.assertEquals(testSize, lastSize);
+   TimerHeapInternalTimer timer;
+   while ((timer = timerPriorityQueue.peek()) != null) {
+   Assert.assertFalse(timerPriorityQueue.isEmpty());
+   Assert.assertEquals(lastSize, 
timerPriorityQueue.size());
+   Assert.assertEquals(timer, timerPriorityQueue.poll());
+   Assert.assertTrue(checkSet.remove(timer));
+   Assert.assertTrue(timer.getTimestamp() >= 
lastTimestamp);
+   lastTimestamp = timer.getTimestamp();
+   --lastSize;
+   }
+
+   Assert.assertTrue(timerPriorityQueue.isEmpty());
+   Assert.assertEquals(0, timerPriorityQueue.size());
+   Assert.assertEquals(0, checkSet.size());
+   }
+
+   @Test
+   public void testAdd() {
+   

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493621#comment-16493621
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191450664
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
+
+   private static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 1);
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   int count) {
+   insertRandomTimers(timerPriorityQueue, null, count);
+   }
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   Set> checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   for (int i = 0; i < count; ++i) {
+   TimerHeapInternalTimer timer =
+   new 
TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE);
+   if (checkSet != null) {
+   Preconditions.checkState(checkSet.add(timer));
+   }
+   Assert.assertTrue(timerPriorityQueue.add(timer));
+   }
+   }
+
+   private static InternalTimerHeap 
newPriorityQueue(int initialCapacity) {
+   return new InternalTimerHeap<>(
+   initialCapacity,
+   KEY_GROUP_RANGE,
+   KEY_GROUP_RANGE.getNumberOfKeyGroups());
+   }
+
+   @Test
+   public void testCombined() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalTimerHeap timerPriorityQueue = 
newPriorityQueue(initialCapacity);
+   HashSet> 
checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(timerPriorityQueue, checkSet, testSize);
+
+   long lastTimestamp = Long.MIN_VALUE;
+   int lastSize = timerPriorityQueue.size();
+   Assert.assertEquals(testSize, lastSize);
+   TimerHeapInternalTimer timer;
+   while ((timer = timerPriorityQueue.peek()) != null) {
+   Assert.assertFalse(timerPriorityQueue.isEmpty());
+   Assert.assertEquals(lastSize, 
timerPriorityQueue.size());
+   Assert.assertEquals(timer, timerPriorityQueue.poll());
+   Assert.assertTrue(checkSet.remove(timer));
+   Assert.assertTrue(timer.getTimestamp() >= 
lastTimestamp);
+   lastTimestamp = timer.getTimestamp();
+   --lastSize;
+   }
+
+   Assert.assertTrue(timerPriorityQueue.isEmpty());
+   Assert.assertEquals(0, timerPriorityQueue.size());
+   Assert.assertEquals(0, checkSet.size());
+   }
+
+   @Test
+   public void testAdd() {
+   

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493609#comment-16493609
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191422654
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
--- End diff --

Why does this class implement the `Queue` and `Set` interface? Is it 
intended to be used as a `Queue` or `Set` somewhere?


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493605#comment-16493605
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191425505
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493603#comment-16493603
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191435370
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493599#comment-16493599
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191413234
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws 
Exception {
keySerializer.snapshotConfiguration(),
namespaceSerializer,
namespaceSerializer.snapshotConfiguration(),
-   getEventTimeTimerSetForKeyGroup(keyGroupIdx),
-   
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
+   
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
+   
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
}
 
/**
 * Restore the timers (both processing and event time ones) for a given 
{@code keyGroupIdx}.
 *
-* @param restoredTimersSnapshot the restored snapshot containing the 
key-group's timers,
+* @param restoredSnapshot the restored snapshot containing the 
key-group's timers,
 *   and the serializers that were used to write 
them
 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
 */
@SuppressWarnings("unchecked")
-   public void restoreTimersForKeyGroup(InternalTimersSnapshot 
restoredTimersSnapshot, int keyGroupIdx) throws IOException {
-   this.restoredTimersSnapshot = (InternalTimersSnapshot) 
restoredTimersSnapshot;
+   public void restoreTimersForKeyGroup(InternalTimersSnapshot 
restoredSnapshot, int keyGroupIdx) {
+   this.restoredTimersSnapshot = (InternalTimersSnapshot) 
restoredSnapshot;
 
-   if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
-   (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer(
 {
+   if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
--- End diff --

This check could be factored out into a method with a meaningful and easy 
to understand name, e.g. `checkSerializerCompatibility`.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493617#comment-16493617
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191446041
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
--- End diff --

Should extend `TestLogger`.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493616#comment-16493616
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191445752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
 ---
@@ -96,7 +96,7 @@ public 
AbstractInternalTimersSnapshotWriter(InternalTimersSnapshot timersS
public final void writeTimersSnapshot(DataOutputView out) 
throws IOException {
writeKeyAndNamespaceSerializers(out);
 
-   InternalTimer.TimerSerializer timerSerializer = 
new InternalTimer.TimerSerializer<>(
+   TimerHeapInternalTimer.TimerSerializer 
timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>(
--- End diff --

Isn't this a bit problematic if we add a new `TimerService` implementation 
other than the `HeapInternalTimerService`? The `InternalTimersSnapshot` is 
independent of the underlying timer service implementation and so should the 
timer serializer be.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493602#comment-16493602
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191423366
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
--- End diff --

That way we could also save us from implementing some methods like 
`toArray` or `iterator`.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493607#comment-16493607
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191436243
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link 
InternalTimerHeap}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer implements 
InternalTimer {
+
+   /** The index that indicates that a tracked internal timer is not 
tracked. */
+   private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+
+   private final long timestamp;
+
+   private final K key;
+
+   private final N namespace;
+
+   /**
+* This field holds the current physical index of this timer when it is 
managed by a timer heap so that we can
+* support fast deletes.
+*/
+   private transient int timerHeapIndex;
--- End diff --

`TimerHeapInternalTimer` is non serializable. Thus, the `transient` keyword 
should not be needed.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493604#comment-16493604
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191427611
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493620#comment-16493620
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191446801
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
+
+   private static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 1);
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   int count) {
+   insertRandomTimers(timerPriorityQueue, null, count);
+   }
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   Set> checkSet,
--- End diff --

`@Nullable` missing


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493618#comment-16493618
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191447618
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
+
+   private static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 1);
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   int count) {
+   insertRandomTimers(timerPriorityQueue, null, count);
+   }
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   Set> checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   for (int i = 0; i < count; ++i) {
+   TimerHeapInternalTimer timer =
+   new 
TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE);
+   if (checkSet != null) {
+   Preconditions.checkState(checkSet.add(timer));
+   }
+   Assert.assertTrue(timerPriorityQueue.add(timer));
+   }
+   }
+
+   private static InternalTimerHeap 
newPriorityQueue(int initialCapacity) {
+   return new InternalTimerHeap<>(
+   initialCapacity,
+   KEY_GROUP_RANGE,
+   KEY_GROUP_RANGE.getNumberOfKeyGroups());
+   }
+
+   @Test
+   public void testCombined() {
--- End diff --

A more descriptive test method name would be helpful.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493614#comment-16493614
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191441546
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link 
InternalTimerHeap}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer implements 
InternalTimer {
+
+   /** The index that indicates that a tracked internal timer is not 
tracked. */
+   private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+
+   private final long timestamp;
+
+   private final K key;
+
+   private final N namespace;
+
+   /**
+* This field holds the current physical index of this timer when it is 
managed by a timer heap so that we can
+* support fast deletes.
+*/
+   private transient int timerHeapIndex;
+
+   TimerHeapInternalTimer(long timestamp, K key, N namespace) {
+   this.timestamp = timestamp;
+   this.key = key;
+   this.namespace = namespace;
+   this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX;
+   }
+
+   @Override
+   public long getTimestamp() {
+   return timestamp;
+   }
+
+   @Override
+   public K getKey() {
+   return key;
+   }
+
+   @Override
+   public N getNamespace() {
+   return namespace;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o instanceof InternalTimer) {
+   InternalTimer timer = (InternalTimer) o;
+   return timestamp == timer.getTimestamp()
+   && key.equals(timer.getKey())
+   && namespace.equals(timer.getNamespace());
+   }
+
+   return false;
+   }
+
+   /**
+* Returns the current index of this timer in the owning timer heap.
+*/
+   int getTimerHeapIndex() {
+   return timerHeapIndex;
+   }
+
+   /**
+* Sets the current index of this timer in the owning timer heap and 
should only be called by the managing heap.
+* @param timerHeapIndex the new index in the timer heap.
+*/
+   void setTimerHeapIndex(int timerHeapIndex) {
+   this.timerHeapIndex = timerHeapIndex;
+   }
+
+   /**
+* This method can be called to indicate that the timer is no longer 
managed be a timer heap, e.g. because it as
+* removed.
+*/
+   void removedFromTimerQueue() {
+   setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX);
+   }
+
+   @Override
+   public int hashCode() {
+   int result = (int) (timestamp ^ (timestamp >>> 32));
+   result = 31 * result + key.hashCode();
+   result = 31 * result + 

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493611#comment-16493611
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191412983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -199,17 +186,9 @@ public long currentWatermark() {
 
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
-   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
-
-   // make sure we only put one timer per key into the queue
-   Set> timerSet = 
getProcessingTimeTimerSetForTimer(timer);
-   if (timerSet.add(timer)) {
-
-   InternalTimer oldHead = 
processingTimeTimersQueue.peek();
+   InternalTimer oldHead = processingTimeTimersQueue.peek();
+   if (processingTimeTimersQueue.scheduleTimer(time, (K) 
keyContext.getCurrentKey(), namespace)) {
--- End diff --

I know this is out of scope, but I think we could get rid of the 
`KeyContext` by passing the current key to the `registerProcessingTimeTimer` 
method. Moreover, instead of calling `KeyContext#setCurrentKey` we could pass 
the key value to the `Triggerable#onEvent/ProcessingTime` method. Triggering 
side effects via the `KeyContext` before calling certain methods is imo very 
brittle.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493613#comment-16493613
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191440142
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link 
InternalTimerHeap}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer implements 
InternalTimer {
+
+   /** The index that indicates that a tracked internal timer is not 
tracked. */
+   private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+
+   private final long timestamp;
+
+   private final K key;
--- End diff --

Can this be `null`?


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493612#comment-16493612
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191440239
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link 
InternalTimerHeap}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer implements 
InternalTimer {
+
+   /** The index that indicates that a tracked internal timer is not 
tracked. */
+   private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+
+   private final long timestamp;
+
+   private final K key;
+
+   private final N namespace;
--- End diff --

`null`?


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493619#comment-16493619
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191446484
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java
 ---
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Tests for {@link InternalTimerHeap}.
+ */
+public class InternalTimerHeapTest {
+
+   private static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 1);
+
+   private static void insertRandomTimers(
+   InternalTimerHeap timerPriorityQueue,
+   int count) {
--- End diff --

Double indentation for parameter list which are wrapped.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493610#comment-16493610
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191440434
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link 
InternalTimerHeap}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer implements 
InternalTimer {
+
+   /** The index that indicates that a tracked internal timer is not 
tracked. */
+   private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+
+   private final long timestamp;
+
+   private final K key;
+
+   private final N namespace;
+
+   /**
+* This field holds the current physical index of this timer when it is 
managed by a timer heap so that we can
+* support fast deletes.
+*/
+   private transient int timerHeapIndex;
+
+   TimerHeapInternalTimer(long timestamp, K key, N namespace) {
+   this.timestamp = timestamp;
+   this.key = key;
+   this.namespace = namespace;
+   this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX;
+   }
+
+   @Override
+   public long getTimestamp() {
+   return timestamp;
+   }
+
+   @Override
+   public K getKey() {
+   return key;
+   }
+
+   @Override
+   public N getNamespace() {
+   return namespace;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o instanceof InternalTimer) {
+   InternalTimer timer = (InternalTimer) o;
+   return timestamp == timer.getTimestamp()
+   && key.equals(timer.getKey())
+   && namespace.equals(timer.getNamespace());
+   }
+
+   return false;
+   }
+
+   /**
+* Returns the current index of this timer in the owning timer heap.
+*/
+   int getTimerHeapIndex() {
+   return timerHeapIndex;
+   }
+
+   /**
+* Sets the current index of this timer in the owning timer heap and 
should only be called by the managing heap.
+* @param timerHeapIndex the new index in the timer heap.
+*/
+   void setTimerHeapIndex(int timerHeapIndex) {
+   this.timerHeapIndex = timerHeapIndex;
+   }
+
+   /**
+* This method can be called to indicate that the timer is no longer 
managed be a timer heap, e.g. because it as
+* removed.
+*/
+   void removedFromTimerQueue() {
+   setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX);
+   }
+
+   @Override
+   public int hashCode() {
+   int result = (int) (timestamp ^ (timestamp >>> 32));
+   result = 31 * result + key.hashCode();
+   result = 31 * result + 

[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493606#comment-16493606
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r191426419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
+ * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
+ * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication maps.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the key of the internal timers managed by this 
priority queue.
+ * @param  type of the namespace of the internal timers managed by this 
priority queue.
+ */
+public class InternalTimerHeap implements 
Queue>, Set> {
+
+   /**
+* A safe maximum size for arrays in the JVM.
+*/
+   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+   /**
+* Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
+*/
+   private static final Comparator> 
COMPARATOR =
+   (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
+*/
+   private final HashMap, 
TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private TimerHeapInternalTimer[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* The key-group range of timers that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+
+   /**
+* Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
+*
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   InternalTimerHeap(
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   

  1   2   3   >