[jira] [Resolved] (FLINK-29962) Exclude Jamon 2.3.1

2022-11-13 Thread Jamie Grier (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier resolved FLINK-29962.
-
Resolution: Fixed

> Exclude Jamon 2.3.1
> ---
>
> Key: FLINK-29962
> URL: https://issues.apache.org/jira/browse/FLINK-29962
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Gateway
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Hi all,
> My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a 
> malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to 
> dependency:tree, Flink already has transitive dependencies on both versions, 
> so I'm proposing to just exclude the transitive dependency from the 
> problematic direct dependencies and pin the dependency to 2.4.1.
> I'll send a PR shortly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29962) Exclude Jamon 2.3.1

2022-11-13 Thread Jamie Grier (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-29962:

Fix Version/s: 1.17.0

> Exclude Jamon 2.3.1
> ---
>
> Key: FLINK-29962
> URL: https://issues.apache.org/jira/browse/FLINK-29962
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Gateway
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Hi all,
> My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a 
> malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to 
> dependency:tree, Flink already has transitive dependencies on both versions, 
> so I'm proposing to just exclude the transitive dependency from the 
> problematic direct dependencies and pin the dependency to 2.4.1.
> I'll send a PR shortly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29962) Exclude Jamon 2.3.1

2022-11-13 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633495#comment-17633495
 ] 

Jamie Grier commented on FLINK-29962:
-

Merged in Flink master: 9572cf6b287d71ee9c307546d8cd8f8898137bdd

> Exclude Jamon 2.3.1
> ---
>
> Key: FLINK-29962
> URL: https://issues.apache.org/jira/browse/FLINK-29962
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Gateway
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>  Labels: pull-request-available
>
> Hi all,
> My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a 
> malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to 
> dependency:tree, Flink already has transitive dependencies on both versions, 
> so I'm proposing to just exclude the transitive dependency from the 
> problematic direct dependencies and pin the dependency to 2.4.1.
> I'll send a PR shortly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-17 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346179#comment-17346179
 ] 

Jamie Grier commented on FLINK-19481:
-

Yes, [~xintongsong], that's my opinion based on experience.  The runtime 
complexity of having the additional Hadoop layer will likely be strictly worse. 
 This is because each layer has it's own configuration and things like thread 
pooling, pool sizes, buffering, and other non-trivial tuning parameters.

 

It can be very difficult to tune this stuff for production workloads with 
non-trivial throughput and having all of those layers makes it (much) worse.  
Due to the config It's a leaky abstraction so you end up having to understand, 
configure, and tune the Flink, Hadoop, and GCS layers anyway.

 

Again, this is based mostly on my experience with the various flavors of the S3 
connector but it will still apply here.  In my experience the more native 
(fewer layers of abstraction) you can achieve the better the result.

 

That said I have not looked at Galen's PR.  It seems from reading the comments 
here though that a good solution would be a hybrid of Ben's work on the native 
GCS Filesystem combined with Galen's work on the RecoverableWriter.

 

 

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-15 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345032#comment-17345032
 ] 

Jamie Grier edited comment on FLINK-19481 at 5/15/21, 1:25 PM:
---

The primary benefits of a native implementation are described earlier in this 
ticket.  This is based on my own experience in production for several years 
with the other Hadoop based File Systems – primarily the S3 one though.

 

https://issues.apache.org/jira/browse/FLINK-19481?focusedCommentId=17211477=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17211477
 
{noformat}
 {noformat}


was (Author: jgrier):
The primary benefits of a native implementation are described earlier in this 
ticket.  This is based on my own experience in production for several years 
with the other Hadoop based File Systems – primarily the S3 one though.

 
{noformat}
 {noformat}

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-15 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345032#comment-17345032
 ] 

Jamie Grier commented on FLINK-19481:
-

The primary benefits of a native implementation are described earlier in this 
ticket.  This is based on my own experience in production for several years 
with the other Hadoop based File Systems – primarily the S3 one though.

 
{noformat}
I think a native GCS filesytem would be a major benefit to Flink users.  The 
only way to support GCS currently is, as stated, through the Hadoop Filesystem 
implementation which brings several problems along with it.  The two largest 
problems I've experienced are:1) Hadoop has a huge dependency footprint which 
is a significant headache for Flink application developers dealing with 
dependency-hell.2) The total stack of FileSystem abstractions on this path 
becomes very difficult to tune, understand, and support.  By stack I'm 
referring to Flink's own FileSystem abstraction, then the Hadoop layer, then 
the GCS libraries.  This is very difficult to work with in production as each 
layer has its own intricacies, connection pools, thread pools, tunable 
configuration, versions, dependency versions, etc.Having gone down this path 
with the old-style Hadoop+S3 filesystem approach I know how difficult it can be 
and a native implementation should prove to be much simpler to support and 
easier to tune and modify for performance.  This is why the presto-s3-fs 
filesystem was adopted, for example.{noformat}

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-15 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345032#comment-17345032
 ] 

Jamie Grier edited comment on FLINK-19481 at 5/15/21, 1:25 PM:
---

The primary benefits of a native implementation are described earlier in this 
ticket.  This is based on my own experience in production for several years 
with the other Hadoop based File Systems – primarily the S3 one though.

 
{noformat}
 {noformat}


was (Author: jgrier):
The primary benefits of a native implementation are described earlier in this 
ticket.  This is based on my own experience in production for several years 
with the other Hadoop based File Systems – primarily the S3 one though.

 
{noformat}
I think a native GCS filesytem would be a major benefit to Flink users.  The 
only way to support GCS currently is, as stated, through the Hadoop Filesystem 
implementation which brings several problems along with it.  The two largest 
problems I've experienced are:1) Hadoop has a huge dependency footprint which 
is a significant headache for Flink application developers dealing with 
dependency-hell.2) The total stack of FileSystem abstractions on this path 
becomes very difficult to tune, understand, and support.  By stack I'm 
referring to Flink's own FileSystem abstraction, then the Hadoop layer, then 
the GCS libraries.  This is very difficult to work with in production as each 
layer has its own intricacies, connection pools, thread pools, tunable 
configuration, versions, dependency versions, etc.Having gone down this path 
with the old-style Hadoop+S3 filesystem approach I know how difficult it can be 
and a native implementation should prove to be much simpler to support and 
easier to tune and modify for performance.  This is why the presto-s3-fs 
filesystem was adopted, for example.{noformat}

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2020-10-09 Thread Jamie Grier (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211477#comment-17211477
 ] 

Jamie Grier commented on FLINK-19481:
-

I think a native GCS filesytem would be a major benefit to Flink users.  The 
only way to support GCS currently is, as stated, through the Hadoop Filesystem 
implementation which brings several problems along with it.  The two largest 
problems I've experienced are:

1) Hadoop has a huge dependency footprint which is a significant headache for 
Flink application developers dealing with dependency-hell.

2) The total stack of FileSystem abstractions on this path becomes very 
difficult to tune, understand, and support.  By stack I'm referring to Flink's 
own FileSystem abstraction, then the Hadoop layer, then the GCS libraries.  
This is very difficult to work with in production as each layer has its own 
intricacies, connection pools, thread pools, tunable configuration, versions, 
dependency versions, etc.

Having gone down this path with the old-style Hadoop+S3 filesystem approach I 
know how difficult it can be and a native implementation should prove to be 
much simpler to support and easier to tune and modify for performance.  This is 
why the presto-s3-fs filesystem was adopted, for example.

 

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Major
> Fix For: 1.12.0
>
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19468) Metrics return empty when data stream / operator name contains "+"

2020-10-05 Thread Jamie Grier (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-19468:
---

Assignee: Boyang Jerry Peng

> Metrics return empty when data stream / operator name contains "+"
> --
>
> Key: FLINK-19468
> URL: https://issues.apache.org/jira/browse/FLINK-19468
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0, 1.9.2, 1.9.3, 2.0.0
>Reporter: Boyang Jerry Peng
>Assignee: Boyang Jerry Peng
>Priority: Major
>  Labels: pull-request-available
>
> There is an issue in which the special character "+" is not removed from the 
> data stream / operator name which causes metrics for the operator to not be 
> properly returned. Code Reference:
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java#L208]
>  
> For example if the operator name is:
> pulsar(url: pulsar+ssl://192.168.1.198:56014)
> Metrics for an operator with the above name will always return empty.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2019-04-01 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier closed FLINK-10484.
---
Resolution: Duplicate

> New latency tracking metrics format causes metrics cardinality explosion
> 
>
> Key: FLINK-10484
> URL: https://issues.apache.org/jira/browse/FLINK-10484
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.5.4, 1.6.0, 1.6.1
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Critical
>
> The new metrics format for latency tracking causes huge metrics cardinality 
> explosion due to the format and the fact that there is a metric reported for 
> a every combination of source subtask index and operator subtask index.  
> Yikes!
> This format is actually responsible for basically taking down our metrics 
> system due to DDOSing our metrics servers (at Lyft).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-11984) StreamingFileSink docs do not mention S3 savepoint caveats.

2019-03-20 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-11984:

Comment: was deleted

(was: [~kkl0u] What are the S3 savepoint caveats?)

> StreamingFileSink docs do not mention S3 savepoint caveats.
> ---
>
> Key: FLINK-11984
> URL: https://issues.apache.org/jira/browse/FLINK-11984
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Documentation
>Affects Versions: 1.7.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11984) StreamingFileSink docs do not mention S3 savepoint caveats.

2019-03-20 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797404#comment-16797404
 ] 

Jamie Grier commented on FLINK-11984:
-

[~kkl0u] What are the S3 savepoint caveats?

> StreamingFileSink docs do not mention S3 savepoint caveats.
> ---
>
> Key: FLINK-11984
> URL: https://issues.apache.org/jira/browse/FLINK-11984
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Documentation
>Affects Versions: 1.7.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2019-02-21 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16774612#comment-16774612
 ] 

Jamie Grier commented on FLINK-10887:
-

[~thw] I'll update the PR with a solution for the aggregand and result that 
works similarly to the aggregateFunction.  This was a designed but I think it's 
an oversight.  Thanks.

> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 24h
>  Time Spent: 50m
>  Remaining Estimate: 23h 10m
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10887) Add source watermark tracking to the JobMaster

2019-02-21 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16774612#comment-16774612
 ] 

Jamie Grier edited comment on FLINK-10887 at 2/22/19 12:11 AM:
---

[~thw] I'll create a new PR with a solution for the aggregand and result that 
works similarly to the aggregateFunction.  This was as designed but I think 
it's an oversight.  Thanks.


was (Author: jgrier):
[~thw] I'll update the PR with a solution for the aggregand and result that 
works similarly to the aggregateFunction.  This was a designed but I think it's 
an oversight.  Thanks.

> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 24h
>  Time Spent: 50m
>  Remaining Estimate: 23h 10m
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading

2019-02-14 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768330#comment-16768330
 ] 

Jamie Grier commented on FLINK-11617:
-

Here's an example:

 

Stacktrace is:

{{java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3 
retry attempts returned ProvisionedThroughputExceededException.}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:234)}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:373)}}

{{  at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:216)}}

{{  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}

{{  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}

{{  at java.lang.Thread.run(Thread.java:748)}}

 

But the root cause is actually given by this log line:

{{Got recoverable SdkClientException. Backing off for 140 millis (null 
(Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request 
ID: c49c8e5b-a068-9733-9043-b215d51b0aa1))}}

 

 

> Kinesis Connector getRecords() failure logging is misleading
> 
>
> Key: FLINK-11617
> URL: https://issues.apache.org/jira/browse/FLINK-11617
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There isn't enough information in the current logging to diagnose a 
> getRecords() failure.  Also there is a hardcoded string that states the 
> failure cause was always ProvisionedThroughputExceededException which isn't 
> true.  There are many possible causes of failures.  This is misleading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading

2019-02-14 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-11617:

Description: There isn't enough information in the current logging to 
diagnose a getRecords() failure.  Also there is a hardcoded string that states 
the failure cause was always ProvisionedThroughputExceededException which isn't 
true.  There are many possible causes of failures.  This is misleading.  (was: 
My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.)

> Kinesis Connector getRecords() failure logging is misleading
> 
>
> Key: FLINK-11617
> URL: https://issues.apache.org/jira/browse/FLINK-11617
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>
> There isn't enough information in the current logging to diagnose a 
> getRecords() failure.  Also there is a hardcoded string that states the 
> failure cause was always ProvisionedThroughputExceededException which isn't 
> true.  There are many possible causes of failures.  This is misleading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading

2019-02-14 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-11617:
---

 Assignee: Jamie Grier  (was: Scott Kidder)
Affects Version/s: 1.5.6
   1.6.3
   1.7.1
  Summary: Kinesis Connector getRecords() failure logging is 
misleading  (was: CLONE - Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector)

> Kinesis Connector getRecords() failure logging is misleading
> 
>
> Key: FLINK-11617
> URL: https://issues.apache.org/jira/browse/FLINK-11617
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11617) CLONE - Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2019-02-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-11617:
---

 Summary: CLONE - Handle AmazonKinesisException gracefully in 
Kinesis Streaming Connector
 Key: FLINK-11617
 URL: https://issues.apache.org/jira/browse/FLINK-11617
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Jamie Grier
Assignee: Scott Kidder


My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687333#comment-16687333
 ] 

Jamie Grier commented on FLINK-10886:
-

ML discussion: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html]

 

> Event time synchronization across sources
> -
>
> Key: FLINK-10886
> URL: https://issues.apache.org/jira/browse/FLINK-10886
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> When reading from a source with many parallel partitions, especially when 
> reading lots of historical data (or recovering from downtime and there is a 
> backlog to read), it's quite common for there to develop an event-time skew 
> across those partitions.
>  
> When doing event-time windowing -- or in fact any event-time driven 
> processing -- the event time skew across partitions results directly in 
> increased buffering in Flink and of course the corresponding state/checkpoint 
> size growth.
>  
> As the event-time skew and state size grows larger this can have a major 
> effect on application performance and in some cases result in a "death 
> spiral" where the application performance get's worse and worse as the state 
> size grows and grows.
>  
> So, one solution to this problem, outside of core changes in Flink itself, 
> seems to be to try to coordinate sources across partitions so that they make 
> progress through event time at roughly the same rate.  In fact if there is 
> large skew the idea would be to slow or even stop reading from some 
> partitions with newer data while first reading the partitions with older 
> data.  Anyway, to do this we need to share state somehow amongst sub-tasks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10888:
---

 Summary: Expose new global watermark RPC to sources
 Key: FLINK-10888
 URL: https://issues.apache.org/jira/browse/FLINK-10888
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


Expose new JobMaster RPC for watermark tracking to Source implementations so it 
can be used to align reads across sources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-10887:

Summary: Add source watermark tracking to the JobMaster  (was: Add source 
watermarking tracking to the JobMaster)

> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10887:
---

 Summary: Add source watermarking tracking to the JobMaster
 Key: FLINK-10887
 URL: https://issues.apache.org/jira/browse/FLINK-10887
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: Jamie Grier
Assignee: Jamie Grier


We need to add a new RPC to the JobMaster such that the current watermark for 
every source sub-task can be reported and the current global minimum/maximum 
watermark can be retrieved so that each source can adjust their partition read 
rates in an attempt to keep sources roughly aligned in event time.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10886:
---

 Summary: Event time synchronization across sources
 Key: FLINK-10886
 URL: https://issues.apache.org/jira/browse/FLINK-10886
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


When reading from a source with many parallel partitions, especially when 
reading lots of historical data (or recovering from downtime and there is a 
backlog to read), it's quite common for there to develop an event-time skew 
across those partitions.
 
When doing event-time windowing -- or in fact any event-time driven processing 
-- the event time skew across partitions results directly in increased 
buffering in Flink and of course the corresponding state/checkpoint size growth.
 
As the event-time skew and state size grows larger this can have a major effect 
on application performance and in some cases result in a "death spiral" where 
the application performance get's worse and worse as the state size grows and 
grows.
 
So, one solution to this problem, outside of core changes in Flink itself, 
seems to be to try to coordinate sources across partitions so that they make 
progress through event time at roughly the same rate.  In fact if there is 
large skew the idea would be to slow or even stop reading from some partitions 
with newer data while first reading the partitions with older data.  Anyway, to 
do this we need to share state somehow amongst sub-tasks.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2018-10-04 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638756#comment-16638756
 ] 

Jamie Grier edited comment on FLINK-10484 at 10/4/18 7:46 PM:
--

Cool.  We should definitely backport FLINK-10242 as well then.  Are you going 
to do this [~Zentol] or should I take a crack at it ?


was (Author: jgrier):
Cool.  We should definitely backport [FLINK-10242] as well then.  Would you 
like to do this [~Zentol] or should I do it?

> New latency tracking metrics format causes metrics cardinality explosion
> 
>
> Key: FLINK-10484
> URL: https://issues.apache.org/jira/browse/FLINK-10484
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.6.0, 1.6.1, 1.5.4
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Critical
>
> The new metrics format for latency tracking causes huge metrics cardinality 
> explosion due to the format and the fact that there is a metric reported for 
> a every combination of source subtask index and operator subtask index.  
> Yikes!
> This format is actually responsible for basically taking down our metrics 
> system due to DDOSing our metrics servers (at Lyft).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2018-10-04 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638756#comment-16638756
 ] 

Jamie Grier commented on FLINK-10484:
-

Cool.  We should definitely backport [FLINK-10242] as well then.  Would you 
like to do this [~Zentol] or should I do it?

> New latency tracking metrics format causes metrics cardinality explosion
> 
>
> Key: FLINK-10484
> URL: https://issues.apache.org/jira/browse/FLINK-10484
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.6.0, 1.6.1, 1.5.4
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Critical
>
> The new metrics format for latency tracking causes huge metrics cardinality 
> explosion due to the format and the fact that there is a metric reported for 
> a every combination of source subtask index and operator subtask index.  
> Yikes!
> This format is actually responsible for basically taking down our metrics 
> system due to DDOSing our metrics servers (at Lyft).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2018-10-03 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16637332#comment-16637332
 ] 

Jamie Grier commented on FLINK-10484:
-

[~Zentol] Great.  I didn't see that this had already been addressed in 1.7.  
What do you think about the difficulty of backporting to 1.5 and 1.6?

Currently, it's a pretty big problem for people trying to run Flink at any 
reasonable scale – and since latency tracking is on by default basically 
everything breaks as soon as you upgrade a job from 1.4 to 1.5.  Also, latency 
tracking is something that has to be disabled from application code rather than 
in the flink-conf.yaml file so it's very hard for infra teams supporting Flink 
to enforce.

It's also not just a problem for Flink JM – but in our case we actually caused 
an observability incident company wide just because of the sheer volume of 
metrics being thrown at our metrics servers.

> New latency tracking metrics format causes metrics cardinality explosion
> 
>
> Key: FLINK-10484
> URL: https://issues.apache.org/jira/browse/FLINK-10484
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.6.0, 1.6.1, 1.5.4
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Critical
>
> The new metrics format for latency tracking causes huge metrics cardinality 
> explosion due to the format and the fact that there is a metric reported for 
> a every combination of source subtask index and operator subtask index.  
> Yikes!
> This format is actually responsible for basically taking down our metrics 
> system due to DDOSing our metrics servers (at Lyft).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2018-10-02 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10484:
---

 Summary: New latency tracking metrics format causes metrics 
cardinality explosion
 Key: FLINK-10484
 URL: https://issues.apache.org/jira/browse/FLINK-10484
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.4, 1.6.1, 1.6.0
Reporter: Jamie Grier
Assignee: Jamie Grier


The new metrics format for latency tracking causes huge metrics cardinality 
explosion due to the format and the fact that there is a metric reported for a 
every combination of source subtask index and operator subtask index.  Yikes!

This format is actually responsible for basically taking down our metrics 
system due to DDOSing our metrics servers (at Lyft).

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10154) Make sure we always read at least one record in KinesisConnector

2018-08-15 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-10154:

Comment: was deleted

(was: PR: [https://github.com/apache/flink/pull/6564]

 )

> Make sure we always read at least one record in KinesisConnector
> 
>
> Key: FLINK-10154
> URL: https://issues.apache.org/jira/browse/FLINK-10154
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.6.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Minor
>  Labels: pull-request-available
>
> It's possible in some cases to request zero records from Kinesis in the 
> Kinesis connector.  This can happen when the "adpative reads" feature is 
> enabled.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10154) Make sure we always read at least one record in KinesisConnector

2018-08-15 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581414#comment-16581414
 ] 

Jamie Grier commented on FLINK-10154:
-

PR: [https://github.com/apache/flink/pull/6564]

 

> Make sure we always read at least one record in KinesisConnector
> 
>
> Key: FLINK-10154
> URL: https://issues.apache.org/jira/browse/FLINK-10154
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.6.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Minor
>  Labels: pull-request-available
>
> It's possible in some cases to request zero records from Kinesis in the 
> Kinesis connector.  This can happen when the "adpative reads" feature is 
> enabled.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10154) Make sure we always read at least one record in KinesisConnector

2018-08-15 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10154:
---

 Summary: Make sure we always read at least one record in 
KinesisConnector
 Key: FLINK-10154
 URL: https://issues.apache.org/jira/browse/FLINK-10154
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.6.0
Reporter: Jamie Grier
Assignee: Jamie Grier


It's possible in some cases to request zero records from Kinesis in the Kinesis 
connector.  This can happen when the "adpative reads" feature is enabled.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis

2018-07-09 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537834#comment-16537834
 ] 

Jamie Grier commented on FLINK-9691:


https://github.com/apache/flink/pull/6290

> Modify run loop in Kinesis ShardConsumer to not sleep for a fixed 
> fetchIntervalMillis
> -
>
> Key: FLINK-9691
> URL: https://issues.apache.org/jira/browse/FLINK-9691
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Jamie Grier
>Priority: Major
>
> Currently the ShardConsumer in the Kinesis connector sleeps for a fixed 
> [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210]
>  resulting in the shard consumer sleeping for more time than necessary and 
> not optimally reading from Kinesis. It should only be sleeping for 
> (fetchIntervalMillis - time taken to process records) before making the 
> subsequent getRecords call. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis

2018-07-09 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-9691:
---
Affects Version/s: 1.5.0
   1.4.2

> Modify run loop in Kinesis ShardConsumer to not sleep for a fixed 
> fetchIntervalMillis
> -
>
> Key: FLINK-9691
> URL: https://issues.apache.org/jira/browse/FLINK-9691
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Jamie Grier
>Priority: Major
>
> Currently the ShardConsumer in the Kinesis connector sleeps for a fixed 
> [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210]
>  resulting in the shard consumer sleeping for more time than necessary and 
> not optimally reading from Kinesis. It should only be sleeping for 
> (fetchIntervalMillis - time taken to process records) before making the 
> subsequent getRecords call. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis

2018-07-06 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-9691:
--

Assignee: Jamie Grier

> Modify run loop in Kinesis ShardConsumer to not sleep for a fixed 
> fetchIntervalMillis
> -
>
> Key: FLINK-9691
> URL: https://issues.apache.org/jira/browse/FLINK-9691
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Lakshmi Rao
>Assignee: Jamie Grier
>Priority: Major
>
> Currently the ShardConsumer in the Kinesis connector sleeps for a fixed 
> [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210]
>  resulting in the shard consumer sleeping for more time than necessary and 
> not optimally reading from Kinesis. It should only be sleeping for 
> (fetchIntervalMillis - time taken to process records) before making the 
> subsequent getRecords call. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-07-02 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530389#comment-16530389
 ] 

Jamie Grier commented on FLINK-9061:


[~ind_rc] Initial changes look good.  Are you going to try to get this into 1.6?

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-07 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505461#comment-16505461
 ] 

Jamie Grier commented on FLINK-9061:


[~neoeahit] This will affect all versions.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426132#comment-16426132
 ] 

Jamie Grier commented on FLINK-9061:


Yup, sounds good to me :)

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426103#comment-16426103
 ] 

Jamie Grier commented on FLINK-9061:


Okay, this is the best documentation I've found on this:  
[https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html]
 and even it is very vague.

It does appear that it doesn't have to be the very first characters but it 
brings up an interesting question.  What are the exact constraints here?  Which 
part of the key name is and isn't used for partitioning exactly?  I mean 
technically all of our checkpoint objects do in fact have several characters of 
uniqueness since the last part of the full object key name is the GUID.

Anyway, not having full info sucks.

[~stevenz3wu] I think your proposal sounds good.  Thanks for offering to do the 
PR :)  That should work well and logical listing of sub-directories should 
still be possible in this scheme by issuing parallel s3 list requests for each 
possible prefix and merging the results.

Shall we proceed with this approach then?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423089#comment-16423089
 ] 

Jamie Grier commented on FLINK-9061:


As I understand it the above doesn't work – maybe if you ask Amazon to set up 
buckets for you manually this could be made to work, but I think in the normal 
case the very first characters of the key name must introduce the randomness.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423072#comment-16423072
 ] 

Jamie Grier edited comment on FLINK-9061 at 4/2/18 8:17 PM:


So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  It's a flat keyspace (I know we all know that of 
course).  The equivalent of a directory listing that would group all the 
checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 


was (Author: jgrier):
So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  The equivalent of a directory listing that would group 
all the checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423072#comment-16423072
 ] 

Jamie Grier commented on FLINK-9061:


So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  The equivalent of a directory listing that would group 
all the checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-30 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420885#comment-16420885
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/31/18 1:32 AM:
-

Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the key name.  That should actually work 
very well.


was (Author: jgrier):
Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the path.  That should actually work 
very well.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-30 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420885#comment-16420885
 ] 

Jamie Grier commented on FLINK-9061:


Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the path.  That should actually work 
very well.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416271#comment-16416271
 ] 

Jamie Grier commented on FLINK-9061:


[~StephanEwen] I don't know if the s3a-based connector exhibits the same 
behavior but I suspect it would since this is not a great approach to writing 
checkpoint data to S3 ;) . We could improve on this by changing the retry 
policy in use as [~ste...@apache.org] said but in the end it seems like we'll 
have to introduce the entropy to really solve the problem.

Any further ideas about which approach might be best?  We could make changes at 
the FileSystem or StateBackend level.  And of course it would make listing 
files hard.  Maybe we could use just two digits for the entropy, like 00-99, 
and when listing we could just list all of those and merge the results..

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416254#comment-16416254
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/27/18 9:13 PM:
-

Yeah, so I completely agree that the response should be a 503 or better yet a 
429 but it's not.  I already ran this through the AWS support channels.  The 
response was essentially that this was "internally" a TooBusyException.  Here's 
their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 


was (Author: jgrier):
Yeah, so I completely agree that should be a 503 but it's not.  I already ran 
this through the AWS channels.  The response was essentially that this was 
"internally" a TooBusyException.  Here's their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416254#comment-16416254
 ] 

Jamie Grier commented on FLINK-9061:


Yeah, so I completely agree that should be a 503 but it's not.  I already ran 
this through the AWS channels.  The response was essentially that this was 
"internally" a TooBusyException.  Here's their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414828#comment-16414828
 ] 

Jamie Grier commented on FLINK-9061:


[~ste...@apache.org]

Here's the full stack trace:

 

 

 

"java.lang.Exception: Could not perform checkpoint 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
 ... 8 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
 in order to obtain the stream state handle
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
 ... 13 common frames omitted
Caused by: java.io.IOException: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended 
Request ID: BLAH_
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
 ... 18 common frames omitted
Caused by: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
 

[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414828#comment-16414828
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/27/18 12:45 AM:
--

[~ste...@apache.org]

Here's the full stack trace:

 

 

 
{quote}"java.lang.Exception: Could not perform checkpoint 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.Exception: Could not complete snapshot 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
 ... 8 common frames omitted
 Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
 in order to obtain the stream state handle
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
 ... 13 common frames omitted
 Caused by: java.io.IOException: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended 
Request ID: BLAH_
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
 ... 18 common frames omitted
 Caused by: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
 

[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-24 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412737#comment-16412737
 ] 

Jamie Grier commented on FLINK-9061:


[~stevenz3wu] Did you contribute those changes back to Flink?  I think this 
will be affecting others as well.  Would you consider a contribution back to 
the project?  Otherwise I will do this but if you already have something 
working we might as well use it or base changes on it.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-22 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-9061:
--

 Summary: S3 checkpoint data not partitioned well -- causes errors 
and poor performance
 Key: FLINK-9061
 URL: https://issues.apache.org/jira/browse/FLINK-9061
 Project: Flink
  Issue Type: Bug
  Components: FileSystem, State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Jamie Grier


I think we need to modify the way we write checkpoints to S3 for high-scale 
jobs (those with many total tasks).  The issue is that we are writing all the 
checkpoint data under a common key prefix.  This is the worst case scenario for 
S3 performance since the key is used as a partition key.
 
In the worst case checkpoints fail with a 500 status code coming back from S3 
and an internal error type of TooBusyException.

 
One possible solution would be to add a hook in the Flink filesystem code that 
allows me to "rewrite" paths.  For example say I have the checkpoint directory 
set to:
 
s3://bucket/flink/checkpoints
 
I would hook that and rewrite that path to:
 
s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
path
 
This would distribute the checkpoint write load around the S3 cluster evenly.
 
For reference: 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
 
Any other people hit this issue?  Any other ideas for solutions?  This is a 
pretty serious problem for people trying to checkpoint to S3.
 
-Jamie
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2017-08-23 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138418#comment-16138418
 ] 

Jamie Grier commented on FLINK-4947:


Sounds good to me :)

> Make all configuration possible via flink-conf.yaml and CLI.
> 
>
> Key: FLINK-4947
> URL: https://issues.apache.org/jira/browse/FLINK-4947
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
>
> I think it's important to make all configuration possible via the 
> flink-conf.yaml and the command line.
> As an example:  To enable "externalizedCheckpoints" you must actually call 
> the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from 
> your Flink program.
> Another example of this would be configuring the RocksDB state backend.
> I think it important to make deployment flexible and easy to build tools 
> around.  For example, the infrastructure teams that make these configuration 
> decisions and provide tools for deploying Flink apps, will be different from 
> the teams deploying apps.  The team writing apps should not have to set all 
> of this lower level configuration up in their programs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2017-08-22 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137249#comment-16137249
 ] 

Jamie Grier commented on FLINK-4947:


[~gaborhermann] It's more than just that but yes I do suggest that you should 
be able to override what's in the config file on the command line.

More importantly though is that all *config* should be configurable via 
flink-conf.yaml.  We shouldn't add features that are only configurable from the 
*user code*.

An example of this used to be the RocksDB state backend.  If you wanted to use 
that backend and configure it in "async" mode you had to put this in 
application code, but that's not great for separation of concerns between 
application developers and ops/platform teams.

I know this isn't black-and-white but we should try to clearly separate 
configuration from user code by putting everything in flink-conf.yaml.  We 
should *also* make it possible to override any of those values on the command 
line when submitting a job.



> Make all configuration possible via flink-conf.yaml and CLI.
> 
>
> Key: FLINK-4947
> URL: https://issues.apache.org/jira/browse/FLINK-4947
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
>
> I think it's important to make all configuration possible via the 
> flink-conf.yaml and the command line.
> As an example:  To enable "externalizedCheckpoints" you must actually call 
> the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from 
> your Flink program.
> Another example of this would be configuring the RocksDB state backend.
> I think it important to make deployment flexible and easy to build tools 
> around.  For example, the infrastructure teams that make these configuration 
> decisions and provide tools for deploying Flink apps, will be different from 
> the teams deploying apps.  The team writing apps should not have to set all 
> of this lower level configuration up in their programs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6199) Single outstanding Async I/O operation per key

2017-03-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-6199:
--

 Summary: Single outstanding Async I/O operation per key
 Key: FLINK-6199
 URL: https://issues.apache.org/jira/browse/FLINK-6199
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier


I would like to propose we extend the Async I/O semantics a bit such that a 
user can guarantee a single outstanding async request per key.

This would allow a user to order async requests per key while still achieving 
the throughput benefits of using Async I/O in the first place.

This is essential for operations where stream order is important but we still 
need to use Async operations to interact with an external system in a 
performant way.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry

2017-03-06 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898438#comment-15898438
 ] 

Jamie Grier commented on FLINK-3026:


[~iemejia] [~plucas] I'm definitely also for the #2 option above.  We should 
definitely create an Official Flink Docker Image.

I also agree with getting rid of the bluemix files but I'm not so sure about 
having multiple base images.  Is that truly a common practice?

I also think your labelling scheme looks reasonable - a full spec plus some 
defaults to make it simple to just grab the latest Flink image.

> Publish the flink docker container to the docker registry
> -
>
> Key: FLINK-3026
> URL: https://issues.apache.org/jira/browse/FLINK-3026
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Docker
>Reporter: Omer Katz
>Assignee: Ismaël Mejía
>  Labels: Deployment, Docker
>
> There's a dockerfile that can be used to build a docker container already in 
> the repository. It'd be awesome to just be able to pull it instead of 
> building it ourselves.
> The dockerfile can be found at 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
> It also doesn't point to the latest version of Flink which I fixed in 
> https://github.com/apache/flink/pull/1366



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-02-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851609#comment-15851609
 ] 

Jamie Grier edited comment on FLINK-5634 at 2/3/17 3:20 PM:


Agreed..  If [FLINK-4326] were merged this would also work fine.  Whether or 
not Flink runs in the background or foreground is orthogonal to where the logs 
go -- as long as Flink can be configured to log to stdout we're good.


was (Author: jgrier):
Agreed..  If [4326] were merged this would also work fine.  Whether or not 
Flink runs in the background or foreground is orthogonal to where the logs go 
-- as long as Flink can be configured to log to stdout we're good.

> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground

2017-02-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851616#comment-15851616
 ] 

Jamie Grier commented on FLINK-4326:


[~StephanEwen] Yup this will also solve the issue I raised in [FLINK-5634] 
since my primary concern was running Docker containers and having the option to 
configure Flink to log to stdout.

> Flink start-up scripts should optionally start services on the foreground
> -
>
> Key: FLINK-4326
> URL: https://issues.apache.org/jira/browse/FLINK-4326
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>
> This has previously been mentioned in the mailing list, but has not been 
> addressed.  Flink start-up scripts start the job and task managers in the 
> background.  This makes it difficult to integrate Flink with most processes 
> supervisory tools and init systems, including Docker.  One can get around 
> this via hacking the scripts or manually starting the right classes via Java, 
> but it is a brittle solution.
> In addition to starting the daemons in the foreground, the start up scripts 
> should use exec instead of running the commends, so as to avoid forks.  Many 
> supervisory tools assume the PID of the process to be monitored is that of 
> the process it first executes, and fork chains make it difficult for the 
> supervisor to figure out what process to monitor.  Specifically, 
> jobmanager.sh and taskmanager.sh should exec flink-daemon.sh, and 
> flink-daemon.sh should exec java.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-02-03 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-5634:
---
Comment: was deleted

(was: Agreed..  If [FLINK-4326] were merged this would also work fine.  Whether 
or not Flink runs in the background or foreground is orthogonal to where the 
logs go -- as long as Flink can be configured to log to stdout we're good.)

> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-02-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851609#comment-15851609
 ] 

Jamie Grier commented on FLINK-5634:


Agreed..  If [4326] were merged this would also work fine.  Whether or not 
Flink runs in the background or foreground is orthogonal to where the logs go 
-- as long as Flink can be configured to log to stdout we're good.

> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-25 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839036#comment-15839036
 ] 

Jamie Grier commented on FLINK-5635:


[~greghogan] I actually wasn't aware of [FLINK-4326].  However there does seem 
to be some overlap.  There is also [FLINK-5634] and an associated PR which 
specifically addresses enabling logging to stdout rather than to a file.  
However, neither of these two issues addresses running Flink processes in the 
foreground and avoiding forking as in [FLINK-4326].  I think this remains a 
separate issue that may need to be addressed.

My main motivation in both of these JIRA issues and associated PRs was in 
providing a better Flink on Docker experience and providing some example 
scripts of how to run Flink properly in container-based environments.  I would 
also like to get some "official" Flink Docker images published once we're 
satisfied with them.

> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-25 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-5634:
--

Assignee: Jamie Grier

> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-25 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-5635:
--

Assignee: Jamie Grier

> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-24 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-5635:
--

 Summary: Improve Docker tooling to make it easier to build images 
and launch Flink via Docker tools
 Key: FLINK-5635
 URL: https://issues.apache.org/jira/browse/FLINK-5635
 Project: Flink
  Issue Type: Improvement
  Components: Docker
Affects Versions: 1.2.0
Reporter: Jamie Grier
 Fix For: 1.2.0


This is a bit of a catch-all ticket for general improvements to the Flink on 
Docker experience.

Things to improve:
  - Make it possible to build a Docker image from your own flink-dist directory 
as well as official releases.
  - Make it possible to override the image name so a user can more easily 
publish these images to their Docker repository
  - Provide scripts that show how to properly run on Docker Swarm or similar 
environments with overlay networking (Kubernetes) without using host networking.
  - Log to stdout rather than to files.
  - Work properly with docker-compose for local deployment as well as 
production deployments (Swarm/k8s)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-24 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-5634:
---
Description: 
Flink always redirects stdout to a file.  While often convenient this isn't 
always what people want.  The most obvious case of this is a Docker deployment.

It should be possible to have Flink log to stdout.

Here is a PR for this:  https://github.com/apache/flink/pull/3204


  was:
Flink always redirects stdout to a file.  While often convenient this isn't 
always what people want.  The most obvious case of this is a Docker deployment.

It should be possible to have Flink log to stdout.


> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3710) ScalaDocs for org.apache.flink.streaming.scala are missing from the web site

2017-01-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15798921#comment-15798921
 ] 

Jamie Grier commented on FLINK-3710:


Hi all, I've been asked about these incomplete ScalaDocs by several users and I 
advocate that we just remove the ScalaDocs links from the Flink website until 
this is resolved.  People look at the ScalaDocs and get confused and think 
that's all the available Scala API documentation.

> ScalaDocs for org.apache.flink.streaming.scala are missing from the web site
> 
>
> Key: FLINK-3710
> URL: https://issues.apache.org/jira/browse/FLINK-3710
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.1
>Reporter: Elias Levy
> Fix For: 1.0.4
>
>
> The ScalaDocs only include docs for org.apache.flink.scala and sub-packages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService

2016-12-07 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15730286#comment-15730286
 ] 

Jamie Grier commented on FLINK-4992:


Another use case here is just simply attaching whatever data you need to 
properly handle the callback -- it might even be the element that you were 
processing when you registered the timer.  Without this you are forced to 
implement some sort of buffering of data yourself.

> Expose String parameter for timers in Timely functions and TimerService
> ---
>
> Key: FLINK-4992
> URL: https://issues.apache.org/jira/browse/FLINK-4992
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Minor
>
> Currently it is very hard to register and execute multiple different types 
> timers from the same user function because timers don't carry any metadata.
> We propose to extend the timer registration and onTimer logic by attaching a 
> String argument so users of these features can implement functionality that 
> depends on this addtitional metadata.
> The proposed new methods:
> In the TimerService:
> void registerProcessingTimeTimer(long time, String label);
> void registerEventTimeTimer(long time, String label);
> In the TimelyFunctions:
> void onTimer(long timestamp, String label, TimeDomain timeDomain, 
> TimerService timerService...);
> This extended functionality can be mapped to a String namespace for the 
> internal timer service. I suggest we don't use the term "namespace" here 
> because it just complicates things for the users, I think "label" or "id" or 
> "name" is much simpler to understand.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process

2016-11-07 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644931#comment-15644931
 ] 

Jamie Grier commented on FLINK-5026:


Another option here would be apply(), rather than process().  Whatever the name 
it should imply that this is sort of the basic building block in Flink.

> Rename TimelyFlatMap to Process
> ---
>
> Key: FLINK-5026
> URL: https://issues.apache.org/jira/browse/FLINK-5026
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The method on {{KeyedDataStream}} would be called {{process()}} and the 
> function itself would be called {{ProcessFunction}}.
> The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful 
> and with the additions to the timer API and state the {{ProcessFunction}} 
> could become the basic, low-level, user-facing API for cases where users 
> nowadays implement their own operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process

2016-11-07 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644807#comment-15644807
 ] 

Jamie Grier commented on FLINK-5026:


+1 - I agree that TimelyFlatMap is a cumbersome name.

> Rename TimelyFlatMap to Process
> ---
>
> Key: FLINK-5026
> URL: https://issues.apache.org/jira/browse/FLINK-5026
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The method on {{KeyedDataStream}} would be called {{process()}} and the 
> function itself would be called {{ProcessFunction}}.
> The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful 
> and with the additions to the timer API and state the {{ProcessFunction}} 
> could become the basic, low-level, user-facing API for cases where users 
> nowadays implement their own operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636428#comment-15636428
 ] 

Jamie Grier edited comment on FLINK-5012 at 11/4/16 2:05 PM:
-

Okay, makes sense about RuntimeContext..  I also like your "ideal" solution 
best -- or maybe:

{code:java}
void flatMap(I value, Context ctx) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
  Collector collector();
}
{code}

Maybe the above is "close enough" to what people are used to since it still 
uses the Collector interface.


was (Author: jgrier):
Okay, makes sense about RuntimeContext..  I also like your "ideal" solution 
best -- or maybe:

{code:java}
void flatMap(I value, Context ctx) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
  Collector collector();
}
{code}


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636428#comment-15636428
 ] 

Jamie Grier commented on FLINK-5012:


Okay, makes sense about RuntimeContext..  I also like your "ideal" solution 
best -- or maybe:

{code:java}
void flatMap(I value, Context ctx) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
  Collector collector();
}
{code}


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636266#comment-15636266
 ] 

Jamie Grier commented on FLINK-5012:


Definitely +1 to the Context parameter

I've always thought there should be a a way to get the timestamp of the current 
element in any Function.  Should we just add this to the RuntimeContext?  Is 
there a good reason to not do this?

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632608#comment-15632608
 ] 

Jamie Grier commented on FLINK-4545:


Big +1!

In general I would love to see this improved.  In my experience this is the 
"one thing" that people run into with Flink, whereas everything else "just 
works" this one parameter they have to set/tune and it's very confusing to 
newcomers.

The equation to get this right is complex and the "correct" setting changes 
based on how they deploy the job, what parallelism they use, how many TMs, etc, 
etc.

It also often happens that things are working and then a user changes their job 
a bit (adding a keyBy for instance) and then it stops working at they have a 
hard time understanding why.

Is there a way we can set this parameter automatically in a majority of use 
cases?  If folks are running single jobs directly on YARN for instance it seems 
we should have all the information necessary to set this parameter 
auto-magically or at least fail-fast and tell the the user what the parameter 
should be set to.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632537#comment-15632537
 ] 

Jamie Grier edited comment on FLINK-4022 at 11/3/16 12:03 PM:
--

Yes, definitely get input from Stephan and/or Aljoscha.  There may be a good 
reason why this simple solution won't work.


was (Author: jgrier):
Yes, definitely get input from Stephan and/or Aljoscha.  There may be a good 
reason why this simple solution wont' work.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632537#comment-15632537
 ] 

Jamie Grier commented on FLINK-4022:


Yes, definitely get input from Stephan and/or Aljoscha.  There may be a good 
reason why this simple solution wont' work.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632378#comment-15632378
 ] 

Jamie Grier commented on FLINK-4022:


Rather than emitting Long.MAX_VALUE for subtasks without partitions or using a 
global watermark service, can we not just have subtasks without partitions (or 
in general are not making progress) emit a special value for watermark which 
means "do not consider this subtask when calculating the current watermark 
downstream"?

The benefit, of course, is that this doesn't require a central service / 
coordination.  Can't this achieve the same thing?  When a partition comes on 
line after this, of course, all of it's data will be considered late -- but 
basically, what else could you do?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629091#comment-15629091
 ] 

Jamie Grier commented on FLINK-4022:


I also think this would be a great feature and a few Flink users have asked 
about this -- both dynamic partition discover within one topic and also dynamic 
topic discovery.

Any progress on this?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation

2016-11-01 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626962#comment-15626962
 ] 

Jamie Grier commented on FLINK-3659:


I would like to suggest that rather than adding a new API method, 
connectWithBroadcast(), we just enable this functionality via the current API 
and check access to state variants at runtime.

In other words all of the following will work:

{code:java}
DataStream stream = new DataStream();
DataStream keyedStream = new DataStream().keyBy("...");
DataStream broadcastStream = new DataStream().broadcast();

stream.connect(stream);
stream.connect(keyedStream);
stream.connect(broadcastStream);

keyedStream.connect(stream);
keyedStream.connect(keyedStream);
keyedStream.connect(broadcastStream);

broadcastStream.connect(stream);
broadcastStream.connect(keyedStream);
broadcastStream.connect(broadcastStream);
{code}

... and based on the actual input types to the LHS and RHS of these connected 
streams we check at runtime what they can do.  For example:

{code:java}
 keyedStream.connect(broadcastStream).flatMap(...)
{code}

In the above the user can access the keyed and broadcast state from his 
flatMap1() method (LHS), and can access only broadcast state from his 
flatMap2() method (RHS).

The reason I suggest this is that it keeps the API simpler and more intuitive 
and there aren't any new APIs to learn -- other than for the new broadcast 
state access itself.  People are already building things exactly this way -- 
they are just being forced to use Checkpointed to make their state 
fault-tolerant.  This allows the same API as before just with some additional 
capabilities and this will work with re-scalable state properly.

In a future version of Flink (2.0+) maybe we can start to think about 
@annotation based APIs more like the current Beam approach which I think is 
very nice.  It allows both flexible and dynamic API evolution as well as 
"static" verification.  Anyway maybe in the future we could do something more 
like this:

{code:java}
class MyCoFlatMap { // doesn't even need to extend anything
   @FlatMap(input=KeyedStream)
   void dataFunc(@BroadcastState("name") String s, @KeyedState("name") Integer 
i) { … }

   @FlatMap(input=BroadcastStream)
   void controlFunc(@BroadcastState("name") String s) { … }
}
{code}

> Add ConnectWithBroadcast Operation
> --
>
> Key: FLINK-3659
> URL: https://issues.apache.org/jira/browse/FLINK-3659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> We should add a new operation that has a main input that can be keyed (but 
> doesn't have to be) and a second input that is always broadcast. This is 
> similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to 
> be keyed or non-keyed.
> This builds on FLINK-4940 which aims at adding broadcast/global state. When 
> processing an element from the broadcast input only access to broadcast state 
> is allowed. When processing an element from the main input access both the 
> regular keyed state and the broadcast state can be accessed.
> I'm proposing this as an intermediate/low-level operation because it will 
> probably take a while until we add support for side-inputs in the API. This 
> new operation would allow expressing new patterns that cannot be expressed 
> with the currently expressed operations.
> This is the new proposed API (names are non-final): 
> 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
> {{KeyedStream.connectWithBroadcast(DataStream)}}
> 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
> 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
> functions.
> Sketch of the user function:
> {code}
> interface BroadcastFlatMapFunction {
>   public void flatMap(IN in, Collector out);
>   public void processBroadcastInput(BIN in);
> }
> {code}
> The API names, function names are a bit verbose and we have to add two new 
> different ones but I don't see a way around this with the current way the 
> Flink API works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4980) Include example source code in Flink binary distribution

2016-10-31 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4980:
--

 Summary: Include example source code in Flink binary distribution
 Key: FLINK-4980
 URL: https://issues.apache.org/jira/browse/FLINK-4980
 Project: Flink
  Issue Type: Improvement
Reporter: Jamie Grier


I think we should include the Flink examples source code in the binary 
distribution of Flink.  This would allow people to download Flink and run 
examples (as now), but also play around with and modify the examples.

Right now they would have to actually get the Flink source distribution if they 
wanted the examples source -- which I think is onerous.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613064#comment-15613064
 ] 

Jamie Grier commented on FLINK-4948:


Makes sense.  Maybe a scheme where we can verify that the checkpoint is at 
least self-consistent -- using only data stored in the checkpoint itself.

> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid 
> when reading (and maybe writing).  This should help prevent any possible 
> state corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4948:
--

 Summary: Consider using checksums or similar to detect bad 
checkpoints
 Key: FLINK-4948
 URL: https://issues.apache.org/jira/browse/FLINK-4948
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier
 Fix For: 1.2.0


We should consider proactively checking to verify that checkpoints are valid 
when reading (and maybe writing).  This should help prevent any possible state 
corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2016-10-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4947:
--

 Summary: Make all configuration possible via flink-conf.yaml and 
CLI.
 Key: FLINK-4947
 URL: https://issues.apache.org/jira/browse/FLINK-4947
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier
 Fix For: 1.2.0


I think it's important to make all configuration possible via the 
flink-conf.yaml and the command line.

As an example:  To enable "externalizedCheckpoints" you must actually call the 
StreamExecutionEnvironment#enableExternalizedCheckpoints() method from your 
Flink program.

Another example of this would be configuring the RocksDB state backend.

I think it important to make deployment flexible and easy to build tools 
around.  For example, the infrastructure teams that make these configuration 
decisions and provide tools for deploying Flink apps, will be different from 
the teams deploying apps.  The team writing apps should not have to set all of 
this lower level configuration up in their programs.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-08-17 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425292#comment-15425292
 ] 

Jamie Grier commented on FLINK-4391:


[~maguowei] Do you have any sense of when you might be able to contribute this? 
 Is it in a publicly accessible fork anywhere so I could take a look?  There 
are a few others in the community that I know would benefit from this 
immediately.

> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4391) Provide support for asynchronous operations over streams

2016-08-13 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4391:
--

 Summary: Provide support for asynchronous operations over streams
 Key: FLINK-4391
 URL: https://issues.apache.org/jira/browse/FLINK-4391
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: Jamie Grier


Many Flink users need to do asynchronous processing driven by data from a 
DataStream.  The classic example would be joining against an external database 
in order to enrich a stream with extra information.

It would be nice to add general support for this type of operation in the Flink 
API.  Ideally this could simply take the form of a new operator that manages 
async operations, keeps so many of them in flight, and then emits results to 
downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15224692#comment-15224692
 ] 

Jamie Grier edited comment on FLINK-3679 at 4/4/16 6:12 PM:


I'm not sure about the locking and operator chaining issues so I would say if 
that's unduly complicated because of this change maybe it's not worth it.  
However, a DeserializationSchema with more flatMap() like semantics would 
certainly be the better API given that bad data issues are a reality.  It also 
seems we could provide this without breaking existing code, but certainly it 
would add a bit more complexity to the API (having multiple variants for this).

Anyway, I agree you can work around this issue my making a special "sentinel" 
value and dealing with all of this is in a chained flatMap() operator.  I 
imagine that's exactly the approach that people are already using.




was (Author: jgrier):
I'm not sure about the locking and operator chaining issues so I would say if 
that's unduly complicated because of this change maybe it's not worth it.  
However, a DeserializationSchema with more flatMap() like semantics would 
certainly the better API given that bad data issues are a reality.  It also 
seems we could provide this without breaking existing code, but certainly it 
would add a bit more complexity to the API (having multiple variants for this).

Anyway, I agree you can work around this issue my making a special "sentinel" 
value and dealing with all of this is in a chained flatMap() operator.  I 
imagine that's exactly the approach that people are already using.



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15224692#comment-15224692
 ] 

Jamie Grier commented on FLINK-3679:


I'm not sure about the locking and operator chaining issues so I would say if 
that's unduly complicated because of this change maybe it's not worth it.  
However, a DeserializationSchema with more flatMap() like semantics would 
certainly the better API given that bad data issues are a reality.  It also 
seems we could provide this without breaking existing code, but certainly it 
would add a bit more complexity to the API (having multiple variants for this).

Anyway, I agree you can work around this issue my making a special "sentinel" 
value and dealing with all of this is in a chained flatMap() operator.  I 
imagine that's exactly the approach that people are already using.



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-03-29 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-3680:
---
Attachment: Screen Shot 2016-03-29 at 8.12.17 PM.png
Screen Shot 2016-03-29 at 8.13.12 PM.png

> Remove or improve (not set) text in the Job Plan UI
> ---
>
> Key: FLINK-3680
> URL: https://issues.apache.org/jira/browse/FLINK-3680
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Jamie Grier
> Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot 
> 2016-03-29 at 8.13.12 PM.png
>
>
> When running streaming jobs the UI display (not set) in the UI in a few 
> different places.  This is not the case for batch jobs.
> To illustrate I've included screen shots of the UI for the batch and 
> streaming WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-03-29 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3680:
--

 Summary: Remove or improve (not set) text in the Job Plan UI
 Key: FLINK-3680
 URL: https://issues.apache.org/jira/browse/FLINK-3680
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Reporter: Jamie Grier


When running streaming jobs the UI display (not set) in the UI in a few 
different places.  This is not the case for batch jobs.

To illustrate I've included screen shots of the UI for the batch and streaming 
WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-03-29 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3679:
--

 Summary: DeserializationSchema should handle zero or more outputs 
for every input
 Key: FLINK-3679
 URL: https://issues.apache.org/jira/browse/FLINK-3679
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Jamie Grier


There are a couple of issues with the DeserializationSchema API that I think 
should be improved.  This request has come to me via an existing Flink user.

The main issue is simply that the API assumes that there is a one-to-one 
mapping between input and outputs.  In reality there are scenarios where one 
input message (say from Kafka) might actually map to zero or more logical 
elements in the pipeline.

Particularly important here is the case where you receive a message from a 
source (such as Kafka) and say the raw bytes don't deserialize properly.  Right 
now the only recourse is to throw IOException and therefore fail the job.  

This is definitely not good since bad data is a reality and failing the job is 
not the right option.  If the job fails we'll just end up replaying the bad 
data and the whole thing will start again.

Instead in this case it would be best if the user could just return the empty 
set.

The other case is where one input message should logically be multiple output 
messages.  This case is probably less important since there are other ways to 
do this but in general it might be good to make the 
DeserializationSchema.deserialize() method return a collection rather than a 
single element.

Maybe we need to support a DeserializationSchema variant that has semantics 
more like that of FlatMap.







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3627) Task stuck on lock in StreamSource when cancelling

2016-03-18 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3627:
--

 Summary: Task stuck on lock in StreamSource when cancelling
 Key: FLINK-3627
 URL: https://issues.apache.org/jira/browse/FLINK-3627
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Jamie Grier


I've seen this occur a couple of times when the # of network buffers is set too 
low.  The job fails with the an appropriate message indicating that the user 
should increase the # of network buffers.  However, some of the task threads 
then hang with a stack trace similar to the following.

2016-03-16 13:38:54,017 WARN  org.apache.flink.runtime.taskmanager.Task 
- Task 'Source: EventGenerator -> (Flat Map, blah -> Filter -> 
Projection -> Flat Map -> Timestamps/Watermarks -> Map) (46/144)' did not react 
to cancelling signal, but is stuck in method:
 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:317)
flink.benchmark.generator.LoadGeneratorSource.run(LoadGeneratorSource.java:38)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field

2016-03-15 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3617:
--

 Summary: NPE from CaseClassSerializer when dealing with null 
Option field
 Key: FLINK-3617
 URL: https://issues.apache.org/jira/browse/FLINK-3617
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.0.0
Reporter: Jamie Grier


This error occurs when serializing a Scala case class with an field of Option[] 
type where the value is not Some or None, but null.

If this is not supported we should have a good error message.

java.lang.RuntimeException: ConsumerThread threw an exception: null
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-22 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158090#comment-15158090
 ] 

Jamie Grier edited comment on FLINK-1502 at 2/23/16 1:30 AM:
-

[~eastcirclek] Let's define our terms to  make sure we're talking about the 
same thing.

*Session*: A single instance of a Job Manager and some # of TaskManagers 
working together.   A session can be created "on-the-fly" for a single job or 
it can be a long-running thing.  Multiple jobs can start, run, and finish in 
the same session.  Think of the "yarn-session.sh" command.  This creates a 
session outside of any particular job.  This is also what I've meant when I've 
said "cluster".  A Yarn session is a "cluster" that we've spun up for some 
length of time on Yarn.  Another example of a cluster would be a standalone 
install of Flink on some # of machines.

*Job*: A single batch or streaming job that runs on a Flink cluster.

In the above scenario, and if your definition of sessions is in agreement with 
mine.  You would instead have the following.  Note that I've named the cluster 
according to the "session" name you've given, because in this case each session 
is really a different (ad-hoc) cluster.  When you run a job directly using just 
"flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job.

After Session 1 is finished, Node 1 would have the following metrics:

- cluster.session1.taskmanager.1.gc_time

After session 2 is finshed, Node 1 would have the following metrics:

- cluster.session1.taskmanager.1.gc_time 
- cluster.session2.taskmanager.2.gc_time
- cluster.session3.taskmanager.3.gc_time

There are many metrics in this case because that's exactly what you want.  
These are JVM scope metrics we are talking about and those are 3 different 
JVMS, not the same one so it makes total sense for them to have these different 
names/scopes.  These metrics have nothing to do with each other and it doesn't 
matter which host they are from.  They are scoped to the cluster (or session) 
and logical TaskManager index, not the host.

The above should not be confused with any host level metrics we want to report. 
 Host level metrics would be scoped simply by the hostname so they wouldn't 
grow either.

One more example, hopefully to clarify.  Let's say I spun up a long-running 
cluster (or session) using yarn-session.sh -tm 3.  Now we have a Flink cluster 
running on YARN with no jobs running and three TaskManagers.  We then run three 
different jobs one after another on this cluster.  The metrics would still 
simply be:

- cluster.yarn-session.taskmanager.1.gc_time
- cluster.yarn-session.taskmanager.2.gc_time
- cluster.yarn-session.taskmanager.3.gc_time

No matter how many jobs you ran this list would not grow, which is natural 
because there have only been 3 TaskManagers.  Now if one of these TaskManagers 
were to fail and be restarted it would assume the same name -- that's the point 
of using "logical" indexes so the set of metrics name in that case still would 
not be larger than the above.

In the initial case you describe above if you didn't want lot's of different 
metrics over time you could also just give all of your sessions the same name.  
Your metrics are growing because you're spinning up many different clusters 
(sessions) over time with different names each time.  If you used the same name 
for the cluster (session) every time this metrics namespace growth would not 
occur.

I hope any of that made sense ;)  This is getting a bit hard to describe this 
way.  We could also sync via Hangouts or something if that is easier.




was (Author: jgrier):
[~eastcirclek] Let's define our terms to  make sure we're talking about the 
same thing.

*Session*: A single instance of a Job Manager and some # of TaskManagers 
working together.   A session can be created "on-the-fly" for a single job or 
it can be a long-running thing.  Multiple jobs can start, run, and finish in 
the same session.  Think of the "yarn-session.sh" command.  This creates a 
session outside of any particular job.  This is also what I've meant when I've 
said "cluster".  A Yarn session is a "cluster" that we've spun up for some 
length of time on Yarn.  Another example of a cluster would be a standalone 
install of Flink on some # of machines.

*Job*: A single batch or streaming job that runs on a Flink cluster.

In the above scenario, and if your definition of sessions is in agreement with 
mine.  You would instead have the following.  Note that I've named the cluster 
according to the "session" name you've given, because in this case each session 
is really a different (ad-hoc) cluster.  When you run a job directly using just 
"flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job.

After Session 1 is finished, Node 1 would have the following metrics:

- cluster.session1.taskmanager.1.gc_time

After 

[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-22 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158090#comment-15158090
 ] 

Jamie Grier commented on FLINK-1502:


[~eastcirclek] Let's define our terms to  make sure we're talking about the 
same thing.

*Session*: A single instance of a Job Manager and some # of TaskManagers 
working together.   A session can be created "on-the-fly" for a single job or 
it can be a long-running thing.  Multiple jobs can start, run, and finish in 
the same session.  Think of the "yarn-session.sh" command.  This creates a 
session outside of any particular job.  This is also what I've meant when I've 
said "cluster".  A Yarn session is a "cluster" that we've spun up for some 
length of time on Yarn.  Another example of a cluster would be a standalone 
install of Flink on some # of machines.

*Job*: A single batch or streaming job that runs on a Flink cluster.

In the above scenario, and if your definition of sessions is in agreement with 
mine.  You would instead have the following.  Note that I've named the cluster 
according to the "session" name you've given, because in this case each session 
is really a different (ad-hoc) cluster.  When you run a job directly using just 
"flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job.

After Session 1 is finished, Node 1 would have the following metrics:

- cluster.session1.taskmanager.1.gc_time

After session 2 is finshed, Node 1 would have the following metrics:

- cluster.session1.taskmanager.1.gc_time 
- cluster.session2.taskmanager.2.gc_time
- cluster.session3.taskmanager.3.gc_time

There are many metrics in this case because that's exactly what you want.  
These are JVM scope metrics we are talking about and those are 3 different 
JVMS, not the same one so it makes total sense for them to have these different 
names/scopes.  These metrics have nothing to do with each other and it doesn't 
matter which host they are from.  They are scoped to the cluster (or session) 
and logical TaskManager index, not the host.

The above should not be confused with any host level metrics we want to report. 
 Host level metrics would be scoped simply by the hostname so they wouldn't 
grow either.

One more example, hopefully to clarify.  Let's say I spun up a long-running 
cluster (or session) using yarn-session.sh -tm 3.  Now we have a Flink cluster 
running on YARN with no jobs running and three TaskManagers.  We then run three 
different jobs one after another on this cluster.  The metrics would still 
simply be:

- cluster.yarn-session.taskmanager.1.gc_time
- cluster.yarn-session.taskmanager.2.gc_time
- cluster.yarn-session.taskmanager.3.gc_time

No matter how many jobs you ran this list would not grow, which is natural 
because there have only been 3 TaskManagers.  Now if one of these TaskManagers 
were to fail and be restarted it would assume the same name -- that's the point 
of using "logical" indexes so the set of metrics name in that case still would 
not be larger than the above.

In the initial case you describe above if you didn't want lot's of different 
metrics over time you could also just give all of your sessions the same name.  
You're metrics are growing because you're spinning up many different clusters 
(sessions) over time with different names each time.  If you used the same name 
for the cluster (session) every time this metrics namespace growth would not 
occur.

I hope any of that made sense ;)  This is getting a bit hard to describe this 
way.  We could also sync via Hangouts or something if that is easier.



> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-18 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152917#comment-15152917
 ] 

Jamie Grier edited comment on FLINK-1502 at 2/18/16 7:21 PM:
-

To be clear what I meant here is to have the indexes assigned to the 
TaskManagers scoped to the *entire* cluster.  Not a particular host like what 
you're describing here.  So, for example, if you spun up a Flink cluster with 
10 TaskManagers running on 10 different hosts the TaskManager's would be given 
a unique *index* on the *cluster*.  Literally, TaskManager[1-10].  Use this to 
scope the metrics, e.g.:

cluster.MyCluster.taskmanager.1.gc_time
cluster.MyCluster.taskmanager.2.gc_time
...
...
cluster.MyCluster.taskmanager.10.gc_time

It doesn't matter which hosts they are on.  These are 10 unique JVMS on some 
set of hosts.




was (Author: jgrier):
To be clear what I meant here is to have the indexes assigned to the 
TaskManagers scoped to the *entire* cluster.  Not a particular host like what 
you're describing here.  So, for example, if you spun up a Flink cluster with 
10 TaskManagers running on 10 different hosts the TaskManager's would be given 
a unique INDEX on the cluster.  Literally, TaskManager[1-10].  Use this to 
scope the metrics, e.g.:

cluster.MyCluster.taskmanager.1.gc_time
cluster.MyCluster.taskmanager.2.gc_time
...
...
cluster.MyCluster.taskmanager.10.gc_time

It doesn't matter which hosts they are on.  These are 10 unique JVMS on some 
set of hosts.



> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-18 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152917#comment-15152917
 ] 

Jamie Grier commented on FLINK-1502:


To be clear what I meant here is to have the indexes assigned to the 
TaskManagers scoped to the *entire* cluster.  Not a particular host like what 
you're describing here.  So, for example, if you spun up a Flink cluster with 
10 TaskManagers running on 10 different hosts the TaskManager's would be given 
a unique INDEX on the cluster.  Literally, TaskManager[1-10].  Use this to 
scope the metrics, e.g.:

cluster.MyCluster.taskmanager.1.gc_time
cluster.MyCluster.taskmanager.2.gc_time
...
...
cluster.MyCluster.taskmanager.10.gc_time

It doesn't matter which hosts they are on.  These are 10 unique JVMS on some 
set of hosts.



> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-18 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152895#comment-15152895
 ] 

Jamie Grier commented on FLINK-1502:


I'm suggesting that we use Dropwizard Metrics library and expose those metrics 
via JMX at a minimum/default, but also via optional configuration we could let 
user's report metrics via any of the Metrics library's available metrics 
Reporter classes.  Ganglia and Graphite are both supported via the built-in 
GangliaReporter and GraphitesReporter, but there are integrations with other 
systems as well.  Of particular interest to people running in production would 
be StatsD, Librato, InfluxDB, etc.

https://dropwizard.github.io/metrics/3.1.0/manual/third-party/

What I'm suggesting is that we should expose the ability for people to 
choose/configure which Reporters to use, but we should default to JMX.  Many 
3rd party tools will be able to consume/route these metrics if they're 
available via JMX so that should be the default.


> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-18 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152873#comment-15152873
 ] 

Jamie Grier commented on FLINK-1502:


[~eastcirclek] Yes, I believe it does.  It's implicit in the metric type that 
get's reported to Ganglia.  I believe what we want is Slope.POSITIVE for 
counters.  I imagine the  Dropwizard metrics library would already do this 
correctly for metrics with type Counter (as opposed to gauge) -- but maybe not.

See here:  
http://codeblog.majakorpi.net/post/16281432462/ganglia-xml-slope-attribute

Also, is there no query language in Ganglia when building a graph that allows 
you graph the rate of change rather than the actual metric?  I'm not too 
familiar with Ganglia.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-17 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151004#comment-15151004
 ] 

Jamie Grier commented on FLINK-1502:


I understand [~eastcirclek]'s points about using the InstanceID.  This is a 
unique ID that is automatically generated (I believe).  As such if you use it 
to namespace the metrics you will see new metrics names whenever new 
TaskMangers are created.  Overtime this means the total # of metrics will grow 
and grow.  From my experience it would be better to have a "logical" ID for 
each TaskManager in the cluster.  Literally like (1, 2, 3, 4, etc) and use this 
value to namespace the metrics.  This will provide better continuity over time 
as TaskManagers come up and down.  However, I don't know if this concept 
actually exists inside Flink at the moment.  Does it?

I would suggest we use logical ids/indexes for TaskManager level metrics, as 
well as task level metrics, etc, as opposed to UUIDs.

So rather than:

taskmanager..gc_time
taskmanager..gc_time

and

task..flatMap.messagesReceived
task..flatMap.messagesReceived

I would suggest something like

cluster..taskmanager.1.gc_time
cluster..taskmanager.2.gc_time

and

cluster..task.1.flatMap.messagesReceived
cluster..task.2.flatMap.messagesReceived

I hope that makes sense.  The main point is to use Logical ID's wherever 
possible, especially for things that change otherwise there will be a lack of 
continuity in the metrics.  Also I don't know that we actually have the 
CLUSTER_NAME concept right now either but we might need this.  This would be 
unique for any given YarnSession if running on YARN for example.  Basically we 
just need some way to group a set of TaskManagers uniquely.  I guess this could 
also be done by using the UUID of the JobManager.

Comments?

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149109#comment-15149109
 ] 

Jamie Grier edited comment on FLINK-1502 at 2/16/16 7:09 PM:
-

Is there no way to refer to a TaskManager by index in order to solve this 
problem?  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but wouldn't just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more and give users more insight into how Flink is running.

Maybe we can collaborate on this.  I want a general way to instrument both 
Flink code and user code and make those metrics available easily via JMX at a 
minimum and maybe directly in Graphite and Ganglia.  Once available in JMX 
there are many tools to integrate with other metrics and alerting systems.


was (Author: jgrier):
Is there no way to refer to a TaskManager by index in order to solve this 
problem.  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but would just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149109#comment-15149109
 ] 

Jamie Grier commented on FLINK-1502:


Is there no way to refer to a TaskManager by index in order to solve this 
problem.  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but would just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149099#comment-15149099
 ] 

Jamie Grier commented on FLINK-1502:


[~eastcirclek] You shouldn't need to do this with counters.  Typically you just 
want to report the value of the counter as is to the metrics system.  The 
metrics system (e.g. Graphite or Ganglia) should have built-in tools for 
turning counters into other types of graphs.  For example, what you really want 
here is a "rate", how many GC invocations per second for example (1st 
derivative of counter).  Ganglia and any decent metrics tools should already 
have this function builtin.  I think we should just report the raw counters.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)