[jira] [Commented] (FLINK-22761) Cannot remove POJO fields

2022-04-26 Thread Scott Kidder (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17528451#comment-17528451
 ] 

Scott Kidder commented on FLINK-22761:
--

This is a duplicate of FLINK-21752

> Cannot remove POJO fields
> -
>
> Key: FLINK-22761
> URL: https://issues.apache.org/jira/browse/FLINK-22761
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.12.1
>Reporter: Ygor Allan de Fraga
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I tested a schema evolution in a state using POJO and no problem was found 
> when trying to add a new field, it was executed just fine. This same field 
> was removed from the POJO as it was just a test, but the application could 
> not restore the state due to an error.
>  
> Here is the error:
> {code:java}
> 2021-05-24 13:05:31,958 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Co-Flat Map -> Map (3/3)#464 
> (e0e6d41a18214eab0a1d3c089d8672de) switched from RUNNING to FAILED.2021-05-24 
> 13:05:31,958 WARN  org.apache.flink.runtime.taskmanager.Task                  
>   [] - Co-Flat Map -> Map (3/3)#464 (e0e6d41a18214eab0a1d3c089d8672de) 
> switched from RUNNING to FAILED.java.lang.Exception: Exception while creating 
> StreamOperatorStateContext. at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> [zdata-flink-streams.jar:0.1] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> [zdata-flink-streams.jar:0.1] at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_282]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for CoStreamFlatMap_b101f370952ea85c2104e98dd54bf7f9_(3/3) from 
> any of the 1 provided restore options. at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[zdata-flink-streams.jar:0.1] ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception. at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[zdata-flink-streams.jar:0.1] at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[zdata-flink-streams.jar:0.1] ... 9 more
> Caused by: 

[jira] [Commented] (FLINK-13111) FlinkKinesisConsumer fails with endpoint and region used together

2019-08-21 Thread Scott Kidder (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912792#comment-16912792
 ] 

Scott Kidder commented on FLINK-13111:
--

This has been addressed by a PR I submitted earlier this year that hasn't been 
reviewed: [https://github.com/apache/flink/pull/8444]

I rely on this capability in production since we access Kinesis through a VPC 
endpoint, and the region name is needed when signing the endpoint URL.

> FlinkKinesisConsumer fails with endpoint and region used together
> -
>
> Key: FLINK-13111
> URL: https://issues.apache.org/jira/browse/FLINK-13111
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Jack Tuck
>Assignee: Yu Li
>Priority: Major
>
> So far I have followed the instructions documented for Flink's kinesis 
> connector to use a local Kinesis. (Using Flink 1.8 and Kinesis connector 1.8)
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#using-non-aws-kinesis-endpoints-for-testing]
> {code:java}
> Properties producerConfig = new Properties();
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> "aws_secret_access_key");
> producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
> "http://localhost:4567;);{code}
> With a Flink producer, these instructions work with a local kinesis (I use 
> Kinesalite).
> However, with a Flink consumer, I get an exception that `aws.region` and 
> `aws.endpoint` are not *both* allowed.
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or 
> AWS endpoint ('aws.endpoint') must be set in the config.{noformat}
> Is this a bug in the connector? I found the PR which fixed this but maybe for 
> only the producer [https://github.com/apache/flink/pull/6045] .
> I found a [workaround on Flink's mailing 
> list]([http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-the-FlinkKinesisProducer-to-work-against-Kinesalite-for-tests-td23438.html]),
>  but their issue is with the producer rather than the consumer but perhaps 
> they got that the wrong way around, it is after all weird how there are two 
> codepaths for consumer/producer.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-9127) Filesystem State Backend logged incorrectly

2018-04-06 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-9127.
---
Resolution: Won't Fix

Comment from Bowen Li indicated that this log statement is in fact correct. The 
statement seems non-intuitive, but oh well. Closing, won't fix.

> Filesystem State Backend logged incorrectly
> ---
>
> Key: FLINK-9127
> URL: https://issues.apache.org/jira/browse/FLINK-9127
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.2, 1.4.2
>Reporter: Scott Kidder
>Priority: Trivial
>
> When using a filesystem backend, the 
> '[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]'
>  class produces a log message stating: "State backend is set to heap memory". 
> Example:
> {{2018-04-04 00:45:49,591 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend 
> is set to heap memory (checkpoints to filesystem 
> "hdfs://hdfs:8020/flink/checkpoints")}}
> It looks like this resulted from some copy-pasta of the previous 
> case-statement that matches on the memory backend. This bug is also present 
> in earlier releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' 
> class.
> This log statement should be corrected to indicate that a filesystem backend 
> is in use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9127) Filesystem State Backend logged incorrectly

2018-04-03 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-9127:
---

 Summary: Filesystem State Backend logged incorrectly
 Key: FLINK-9127
 URL: https://issues.apache.org/jira/browse/FLINK-9127
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2, 1.3.2
Reporter: Scott Kidder


When using a filesystem backend, the 
'[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]'
 class produces a log message stating: "State backend is set to heap memory". 

Example:

{{2018-04-04 00:45:49,591 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend 
is set to heap memory (checkpoints to filesystem 
"hdfs://hdfs:8020/flink/checkpoints")}}

It looks like this resulted from some copy-pasta of the previous case-statement 
that matches on the memory backend. This bug is also present in earlier 
releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' class.

This log statement should be corrected to indicate that a filesystem backend is 
in use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-07-05 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder reassigned FLINK-7021:
---

Assignee: Scott Kidder

> Flink Task Manager hangs on startup if one Zookeeper node is unresolvable
> -
>
> Key: FLINK-7021
> URL: https://issues.apache.org/jira/browse/FLINK-7021
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Environment: Kubernetes cluster running:
> * Flink 1.3.0 Job Manager & Task Manager on Java 8u131
> * Zookeeper 3.4.10 cluster with 3 nodes
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> h2. Problem
> Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
> the Zookeeper connection string is unresolvable.
> h2. Expected Behavior
> Flink should retry name resolution & connection to Zookeeper nodes with 
> exponential back-off.
> h2. Environment Details
> We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in 
> a configuration that automatically detects and applies operating system 
> updates. We have a Zookeeper node running on the same CoreOS instance as 
> Flink. It's possible that the Zookeeper node will not yet be started when the 
> Flink components are started. This could cause hostname resolution of the 
> Zookeeper nodes to fail.
> h3. Flink Task Manager Logs
> {noformat}
> 2017-06-27 15:38:51,713 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Using 
> configured hostname/address for TaskManager: 10.2.45.11
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor system at 10.2.45.11:6122.
> 2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger
>   - Slf4jLogger started
> 2017-06-27 15:38:53,079 INFO  Remoting
>   - Starting remoting
> 2017-06-27 15:38:53,573 INFO  Remoting
>   - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.2.45.11:6122]
> 2017-06-27 15:38:53,576 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor
> 2017-06-27 15:38:53,660 INFO  
> org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
> [server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
> segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
> (manual), number of client threads: 2 (manual), server connect backlog: 0 
> (use Netty's default), client connect timeout (sec): 120, send/receive buffer 
> size (bytes): 0 (use Netty's default)]
> 2017-06-27 15:38:53,682 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages 
> have a max timeout of 1 ms
> 2017-06-27 15:38:53,688 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary 
> file directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
> 2017-06-27 15:38:54,071 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 
> MB for network buffer pool (number of memory segments: 3095, bytes per 
> segment: 32768).
> 2017-06-27 15:38:54,564 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
> network environment and its components.
> 2017-06-27 15:38:54,576 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
> initialization (took 4 ms).
> 2017-06-27 15:38:54,677 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
> initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
> 2017-06-27 15:38:54,981 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
> managed memory to 0.7 of the currently free heap space (612 MB), memory will 
> be allocated lazily.
> 2017-06-27 15:38:55,050 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
> files.
> 2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>   - Configuring StatsDReporter with {interval=10 SECONDS, 
> port=8125, host=localhost, 
> class=org.apache.flink.metrics.statsd.StatsDReporter}.
> 2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter  
>   - Configured StatsDReporter with {host:localhost, port:8125}
> 2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>   - Periodically 

[jira] [Commented] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-28 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067189#comment-16067189
 ] 

Scott Kidder commented on FLINK-7021:
-

Logs from Flink Task Manager during startup when an unresolvable Zookeeper 
hostname is given:

{noformat}
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.quorum, foo.bar:2181
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.storageDir, 
hdfs://hdfs:8020/flink/recovery
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.jobmanager.port, 6123
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.rpc.port, 6122
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.data.port, 6121
2017-06-28 20:28:22,674 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.hostname, 10.2.45.10
2017-06-28 20:28:24,249 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore 
- Creating highly available BLOB storage directory at 
hdfs://hdfs:8020/flink/recovery//default/blob
2017-06-28 20:28:24,380 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 
'recovery.zookeeper.quorum' instead of proper key 
'high-availability.zookeeper.quorum'
2017-06-28 20:28:24,448 INFO  org.apache.flink.runtime.util.ZooKeeperUtils  
- Enforcing default ACL for ZK connections
2017-06-28 20:28:24,449 INFO  org.apache.flink.runtime.util.ZooKeeperUtils  
- Using '/flink/default' as Zookeeper namespace.

==> 
/usr/local/flink/log/flink--taskmanager-0-flink-taskmanager-3923888361-c6fdw.out
 <==
tail: unrecognized file system type 0x794c7630 for 
‘/usr/local/flink/log/flink--taskmanager-0-flink-taskmanager-3923888361-c6fdw.log’.
 please report this to bug-coreut...@gnu.org. reverting to polling
tail: unrecognized file system type 0x794c7630 for 
‘/usr/local/flink/log/flink--taskmanager-0-flink-taskmanager-3923888361-c6fdw.out’.
 please report this to bug-coreut...@gnu.org. reverting to polling

==> 
/usr/local/flink/log/flink--taskmanager-0-flink-taskmanager-3923888361-c6fdw.log
 <==
2017-06-28 20:28:24,564 INFO  
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl  
- Starting
2017-06-28 20:28:24,569 INFO  org.apache.zookeeper.ZooKeeper
- Client 
environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, 
built on 03/23/2017 10:13 GMT
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:host.name=flink-taskmanager-3923888361-c6fdw
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:java.version=1.8.0_131
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:java.vendor=Oracle Corporation
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client 
environment:java.class.path=/usr/local/flink-1.3.1/lib/egads-0.1.jar:/usr/local/flink-1.3.1/lib/flink-connector-kinesis_2.11-1.3.1.jar:/usr/local/flink-1.3.1/lib/flink-connector-rabbitmq_2.11-1.3.1.jar:/usr/local/flink-1.3.1/lib/flink-metrics-statsd-1.3.1.jar:/usr/local/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/usr/local/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/usr/local/flink-1.3.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/usr/local/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:java.io.tmpdir=/tmp
2017-06-28 20:28:24,570 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:java.compiler=
2017-06-28 20:28:24,571 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:os.name=Linux
2017-06-28 20:28:24,571 INFO  org.apache.zookeeper.ZooKeeper
- Client environment:os.arch=amd64
2017-06-28 

[jira] [Commented] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-28 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066590#comment-16066590
 ] 

Scott Kidder commented on FLINK-7021:
-

Related to this Curator client library bug:
https://issues.apache.org/jira/browse/CURATOR-229

> Flink Task Manager hangs on startup if one Zookeeper node is unresolvable
> -
>
> Key: FLINK-7021
> URL: https://issues.apache.org/jira/browse/FLINK-7021
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Environment: Kubernetes cluster running:
> * Flink 1.3.0 Job Manager & Task Manager on Java 8u131
> * Zookeeper 3.4.10 cluster with 3 nodes
>Reporter: Scott Kidder
>
> h2. Problem
> Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
> the Zookeeper connection string is unresolvable.
> h2. Expected Behavior
> Flink should retry name resolution & connection to Zookeeper nodes with 
> exponential back-off.
> h2. Environment Details
> We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in 
> a configuration that automatically detects and applies operating system 
> updates. We have a Zookeeper node running on the same CoreOS instance as 
> Flink. It's possible that the Zookeeper node will not yet be started when the 
> Flink components are started. This could cause hostname resolution of the 
> Zookeeper nodes to fail.
> h3. Flink Task Manager Logs
> {noformat}
> 2017-06-27 15:38:51,713 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Using 
> configured hostname/address for TaskManager: 10.2.45.11
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor system at 10.2.45.11:6122.
> 2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger
>   - Slf4jLogger started
> 2017-06-27 15:38:53,079 INFO  Remoting
>   - Starting remoting
> 2017-06-27 15:38:53,573 INFO  Remoting
>   - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.2.45.11:6122]
> 2017-06-27 15:38:53,576 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor
> 2017-06-27 15:38:53,660 INFO  
> org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
> [server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
> segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
> (manual), number of client threads: 2 (manual), server connect backlog: 0 
> (use Netty's default), client connect timeout (sec): 120, send/receive buffer 
> size (bytes): 0 (use Netty's default)]
> 2017-06-27 15:38:53,682 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages 
> have a max timeout of 1 ms
> 2017-06-27 15:38:53,688 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary 
> file directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
> 2017-06-27 15:38:54,071 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 
> MB for network buffer pool (number of memory segments: 3095, bytes per 
> segment: 32768).
> 2017-06-27 15:38:54,564 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
> network environment and its components.
> 2017-06-27 15:38:54,576 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
> initialization (took 4 ms).
> 2017-06-27 15:38:54,677 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
> initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
> 2017-06-27 15:38:54,981 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
> managed memory to 0.7 of the currently free heap space (612 MB), memory will 
> be allocated lazily.
> 2017-06-27 15:38:55,050 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
> files.
> 2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>   - Configuring StatsDReporter with {interval=10 SECONDS, 
> port=8125, host=localhost, 
> class=org.apache.flink.metrics.statsd.StatsDReporter}.
> 2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter  
>   - Configured StatsDReporter with {host:localhost, port:8125}
> 2017-06-27 15:38:55,065 INFO  

[jira] [Updated] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-28 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-7021:

Affects Version/s: 1.3.1

> Flink Task Manager hangs on startup if one Zookeeper node is unresolvable
> -
>
> Key: FLINK-7021
> URL: https://issues.apache.org/jira/browse/FLINK-7021
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Environment: Kubernetes cluster running:
> * Flink 1.3.0 Job Manager & Task Manager on Java 8u131
> * Zookeeper 3.4.10 cluster with 3 nodes
>Reporter: Scott Kidder
>
> h2. Problem
> Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
> the Zookeeper connection string is unresolvable.
> h2. Expected Behavior
> Flink should retry name resolution & connection to Zookeeper nodes with 
> exponential back-off.
> h2. Environment Details
> We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in 
> a configuration that automatically detects and applies operating system 
> updates. We have a Zookeeper node running on the same CoreOS instance as 
> Flink. It's possible that the Zookeeper node will not yet be started when the 
> Flink components are started. This could cause hostname resolution of the 
> Zookeeper nodes to fail.
> h3. Flink Task Manager Logs
> {noformat}
> 2017-06-27 15:38:51,713 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Using 
> configured hostname/address for TaskManager: 10.2.45.11
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager
> 2017-06-27 15:38:51,714 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor system at 10.2.45.11:6122.
> 2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger
>   - Slf4jLogger started
> 2017-06-27 15:38:53,079 INFO  Remoting
>   - Starting remoting
> 2017-06-27 15:38:53,573 INFO  Remoting
>   - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.2.45.11:6122]
> 2017-06-27 15:38:53,576 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Starting 
> TaskManager actor
> 2017-06-27 15:38:53,660 INFO  
> org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
> [server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
> segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
> (manual), number of client threads: 2 (manual), server connect backlog: 0 
> (use Netty's default), client connect timeout (sec): 120, send/receive buffer 
> size (bytes): 0 (use Netty's default)]
> 2017-06-27 15:38:53,682 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages 
> have a max timeout of 1 ms
> 2017-06-27 15:38:53,688 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary 
> file directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
> 2017-06-27 15:38:54,071 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 
> MB for network buffer pool (number of memory segments: 3095, bytes per 
> segment: 32768).
> 2017-06-27 15:38:54,564 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
> network environment and its components.
> 2017-06-27 15:38:54,576 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
> initialization (took 4 ms).
> 2017-06-27 15:38:54,677 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
> initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
> 2017-06-27 15:38:54,981 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
> managed memory to 0.7 of the currently free heap space (612 MB), memory will 
> be allocated lazily.
> 2017-06-27 15:38:55,050 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
> files.
> 2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>   - Configuring StatsDReporter with {interval=10 SECONDS, 
> port=8125, host=localhost, 
> class=org.apache.flink.metrics.statsd.StatsDReporter}.
> 2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter  
>   - Configured StatsDReporter with {host:localhost, port:8125}
> 2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>   - Periodically reporting metrics in intervals of 10 SECONDS 

[jira] [Created] (FLINK-7022) Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7022:
---

 Summary: Flink Job Manager Scheduler & Web Frontend out of sync 
when Zookeeper is unavailable on startup
 Key: FLINK-7022
 URL: https://issues.apache.org/jira/browse/FLINK-7022
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Job Manager web frontend is permanently unavailable if one or more 
Zookeeper nodes are unresolvable during startup. The job scheduler eventually 
recovers and assigns jobs to task managers, but the web frontend continues to 
respond with an HTTP 503 and the following message:
{noformat}Service temporarily unavailable due to an ongoing leader election. 
Please refresh.{noformat}

h2. Expected Behavior
Once Flink is able to interact with Zookeeper successfully, all aspects of the 
Job Manager (job scheduling & the web frontend) should be available.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.host, localhost
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.port, 8125
2017-06-27 15:38:47,162 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.interval, 10 SECONDS
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend, filesystem
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend.fs.checkpointdir, 
hdfs://hdfs:8020/flink/checkpoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.savepoints.dir, hdfs://hdfs:8020/flink/savepoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.mode, zookeeper
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.quorum, 
zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.storageDir, 
hdfs://hdfs:8020/flink/recovery
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.jobmanager.port, 6123
2017-06-27 15:38:47,257 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: blob.server.port, 41479
2017-06-27 15:38:47,357 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 'recovery.mode' instead 
of proper key 'high-availability'
2017-06-27 15:38:47,366 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager with high-availability
2017-06-27 15:38:47,366 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 
'recovery.jobmanager.port' instead of proper key 
'high-availability.jobmanager.port'
2017-06-27 15:38:47,452 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager on flink:6123 with execution mode CLUSTER
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, flink
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration  

[jira] [Created] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7021:
---

 Summary: Flink Task Manager hangs on startup if one Zookeeper node 
is unresolvable
 Key: FLINK-7021
 URL: https://issues.apache.org/jira/browse/FLINK-7021
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
the Zookeeper connection string is unresolvable.

h2. Expected Behavior
Flink should retry name resolution & connection to Zookeeper nodes with 
exponential back-off.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:51,713 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Using configured hostname/address for TaskManager: 10.2.45.11
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor system at 10.2.45.11:6122.
2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2017-06-27 15:38:53,079 INFO  Remoting  
- Starting remoting
2017-06-27 15:38:53,573 INFO  Remoting  
- Remoting started; listening on addresses 
:[akka.tcp://flink@10.2.45.11:6122]
2017-06-27 15:38:53,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor
2017-06-27 15:38:53,660 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
[server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
(manual), number of client threads: 2 (manual), server connect backlog: 0 (use 
Netty's default), client connect timeout (sec): 120, send/receive buffer size 
(bytes): 0 (use Netty's default)]
2017-06-27 15:38:53,682 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 1 ms
2017-06-27 15:38:53,688 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file 
directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
2017-06-27 15:38:54,071 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 MB 
for network buffer pool (number of memory segments: 3095, bytes per segment: 
32768).
2017-06-27 15:38:54,564 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
network environment and its components.
2017-06-27 15:38:54,576 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
initialization (took 4 ms).
2017-06-27 15:38:54,677 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
2017-06-27 15:38:54,981 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
managed memory to 0.7 of the currently free heap space (612 MB), memory will be 
allocated lazily.
2017-06-27 15:38:55,050 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
files.
2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Configuring StatsDReporter with {interval=10 SECONDS, port=8125, 
host=localhost, class=org.apache.flink.metrics.statsd.StatsDReporter}.
2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter
- Configured StatsDReporter with {host:localhost, port:8125}
2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Periodically reporting metrics in intervals of 10 SECONDS for 
reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
2017-06-27 15:38:55,175 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-e4c5bcc5-7513-40d9-a665-0d33c80a36ba
2017-06-27 15:38:55,187 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses 

[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017653#comment-16017653
 ] 

Scott Kidder commented on FLINK-5898:
-

I created a new build of Flink that uses KPL {{0.12.4}} and AWS SDK 
{{1.11.128}}. I had a job that was unable to restore from an earlier checkpoint 
made with my patched KPL {{0.12.3}} and AWS SDK {{1.11.86}}:

{noformat}
java.io.InvalidClassException: com.amazonaws.services.kinesis.model.Shard; 
local class incompatible: stream classdesc serialVersionUID = 
206186249602915, local class serialVersionUID = 5010840014163691006
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.HashMap.readObject(HashMap.java:1402)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:307)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:166)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreStreamCheckpointed(AbstractStreamOperator.java:240)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:641)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:247)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
{noformat}

So, there are incompatible changes in the Kinesis {{Shard}} class included in 
the AWS SDK release referenced directly by KPL {{0.12.4}}. Just something to be 
aware of when upgrading the KPL.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> 

[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017473#comment-16017473
 ] 

Scott Kidder commented on FLINK-5898:
-

Hi Gordon! I think this issue and FLINK-5946 warrant upgrading the default KPL 
dependency to 0.12.4.

Also included in KPL 0.12.4 is a change to the AWS SDK core library dependency. 
In previous versions of the KPL, this dependency was expressed as a range, but 
now it's pinned to a specific version: `1.11.128`

I've been using `1.11.86` with my patched KPL, but would like to test 
`1.11.128` before suggesting we upgrade the KPL dependency to 0.12.4.

What do you think?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5946) Kinesis Producer uses KPL that orphans threads that consume 100% CPU

2017-05-19 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-5946.
---
Resolution: Workaround

The fix for this issue is included in release `0.12.4` of the AWS KPL, released 
2 days ago (May 17, 2017). Anyone affected by this issue can use version 
`0.12.4` of the KPL. Marking this issue as closed.

> Kinesis Producer uses KPL that orphans threads that consume 100% CPU
> 
>
> Key: FLINK-5946
> URL: https://issues.apache.org/jira/browse/FLINK-5946
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> It's possible for the Amazon Kinesis Producer Library (KPL) to leave orphaned 
> threads running after the producer has been instructed to shutdown via the 
> `destroy()` method. These threads run in a very tight infinite loop that can 
> push CPU usage to 100%. I've seen this happen on several occasions, though it 
> does not happen all of the time. Once these threads are orphaned, the only 
> solution to bring CPU utilization back down is to restart the Flink Task 
> Manager.
> When a KPL producer is instantiated, it creates several threads: one to 
> execute and monitor the native sender process, and two threads to monitor the 
> process' stdout and stderr output. It's possible for the process-monitor 
> thread to stop in such a way that leaves the output monitor threads orphaned.
> I've submitted a Github issue and pull-request against the KPL project:
> https://github.com/awslabs/amazon-kinesis-producer/issues/93
> https://github.com/awslabs/amazon-kinesis-producer/pull/94
> This issue is rooted in the Amazon Kinesis Producer Library (KPL) that the 
> Flink Kinesis streaming connector depends upon. It ought to be fixed in the 
> KPL, but I want to document it on the Flink project. The Flink KPL dependency 
> should be updated once the KPL has been fixed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-5898.
---
Resolution: Workaround

Issue fixed in KPL version 0.12.4.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017444#comment-16017444
 ] 

Scott Kidder commented on FLINK-5898:
-

The fix for this issue is included in release `0.12.4` of the AWS KPL, released 
2 days ago (May 17, 2017). Anyone affected by this issue can use version 
`0.12.4` of the KPL. Marking this issue as closed.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5946) Kinesis Producer uses KPL that orphans threads that consume 100% CPU

2017-05-15 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011273#comment-16011273
 ] 

Scott Kidder commented on FLINK-5946:
-

FYI: the pull-request against the Kinesis Producer Library (KPL) was accepted 
and merged into the master branch of the KPL project repo. I imagine it'll be 
present in the next release of the KPL.

> Kinesis Producer uses KPL that orphans threads that consume 100% CPU
> 
>
> Key: FLINK-5946
> URL: https://issues.apache.org/jira/browse/FLINK-5946
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> It's possible for the Amazon Kinesis Producer Library (KPL) to leave orphaned 
> threads running after the producer has been instructed to shutdown via the 
> `destroy()` method. These threads run in a very tight infinite loop that can 
> push CPU usage to 100%. I've seen this happen on several occasions, though it 
> does not happen all of the time. Once these threads are orphaned, the only 
> solution to bring CPU utilization back down is to restart the Flink Task 
> Manager.
> When a KPL producer is instantiated, it creates several threads: one to 
> execute and monitor the native sender process, and two threads to monitor the 
> process' stdout and stderr output. It's possible for the process-monitor 
> thread to stop in such a way that leaves the output monitor threads orphaned.
> I've submitted a Github issue and pull-request against the KPL project:
> https://github.com/awslabs/amazon-kinesis-producer/issues/93
> https://github.com/awslabs/amazon-kinesis-producer/pull/94
> This issue is rooted in the Amazon Kinesis Producer Library (KPL) that the 
> Flink Kinesis streaming connector depends upon. It ought to be fixed in the 
> KPL, but I want to document it on the Flink project. The Flink KPL dependency 
> should be updated once the KPL has been fixed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-15 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011272#comment-16011272
 ] 

Scott Kidder commented on FLINK-5898:
-

FYI: the pull-request against the Kinesis Producer Library (KPL) was accepted 
and merged into the master branch of the KPL project repo. I imagine it'll be 
present in the next release of the KPL.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6176) Add JARs to CLASSPATH deterministically

2017-03-23 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938990#comment-15938990
 ] 

Scott Kidder commented on FLINK-6176:
-

The ordering of {{FLINK_CLASSPATH}} entries affected classloader 
prioritization, which is the reason for the {{NoClassDefFoundError}} I hit. 
I'll provide some more specifics.

*Old hosts* that *work* have the following profile:

|Base OS|Ubuntu 14.04.3 LTS (GNU/Linux 3.13.0-74-generic x86_64)|
|Kernel|Linux ip-10-55-2-175 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 
22:52:10 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux|
|Docker Version| Docker version 1.11.1, build 5604cbe|
|Calculated 
{{FLINK_CLASSPATH}}|/usr/local/flink-1.2.0/lib/egads-0.1.jar:/usr/local/flink-1.2.0/lib/flink-metrics-statsd-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/log4j-1.2.17.jar:/usr/local/flink-1.2.0/lib/flink-python_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/slf4j-log4j12-1.7.7.jar:/usr/local/flink-1.2.0/lib/flink-connector-rabbitmq_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/flink-connector-kinesis_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/flink-dist_2.11-1.2-SNAPSHOT.jar:::|

*New Hosts* that *do not work* have the following profile:

|Base OS|Ubuntu 14.04.5 LTS (GNU/Linux 3.13.0-112-generic x86_64)|
|Kernel|Linux ip-10-55-3-137 3.13.0-112-generic #159-Ubuntu SMP Fri Mar 3 
15:26:07 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux|
|Docker Version| Docker version 17.03.0-ce, build 3a232c8|
|Calculated 
{{FLINK_CLASSPATH}}|/usr/local/flink-1.2.0/lib/egads-0.1.jar:/usr/local/flink-1.2.0/lib/log4j-1.2.17.jar:/usr/local/flink-1.2.0/lib/flink-connector-rabbitmq_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/flink-python_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/flink-dist_2.11-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/flink-metrics-statsd-1.2-SNAPSHOT.jar:/usr/local/flink-1.2.0/lib/slf4j-log4j12-1.7.7.jar:/usr/local/flink-1.2.0/lib/flink-connector-kinesis_2.11-1.2-SNAPSHOT.jar:::|

The sizes & timestamps for all JARs were identical. But note the difference in 
ordering of Classpath entries. The Kinesis JAR file contains shaded 
dependencies, including a newer version of Apache HTTP Client than what's 
included in the Flink distribution JAR.

The *new host* produced a {{FLINK_CLASSPATH}} with the `flink-dist` JAR in the 
middle of the classpath, ahead of the Kinesis JAR. This led to the older HTTP 
Client bundled with the Flink distribution JAR taking precedence, and then 
being unable to tie back to the AWS classes. This difference in ordering led to 
the following exception being thrown when my application that uses the Flink 
Kinesis Streaming Connector:

{code}
java.lang.NoClassDefFoundError: Could not initialize class 
com.amazonaws.http.conn.ssl.SdkTLSSocketFactory
 at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.getPreferredSocketFactory(ApacheConnectionManagerFactory.java:87)
 at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:65)
 at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:58)
 at 
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:51)
 at 
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:39)
 at 
com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:319)
 at 
com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:303)
 at 
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:165)
 at 
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:154)
 at 
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:243)
 at 
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:218)
 at 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.createKinesisClient(AWSUtil.java:56)
 at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:121)
 at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:179)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:188)
 at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:198)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
   

[jira] [Updated] (FLINK-6176) Add JARs to CLASSPATH deterministically

2017-03-23 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-6176:

Description: 
The {{config.sh}} script uses the following shell-script function to build the 
{{FLINK_CLASSPATH}} variable from a listing of JAR files in the 
{{$FLINK_LIB_DIR}} directory:

{code}
constructFlinkClassPath() {

while read -d '' -r jarfile ; do
if [[ $FLINK_CLASSPATH = "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)

echo $FLINK_CLASSPATH
}
{code}

The {{find}} command as specified will return files in directory-order, which 
varies by OS and filesystem.

The inconsistent ordering of directory contents caused problems for me when 
installing a Flink Docker image onto new machine with a newer version of Docker 
and different filesystem (UFS). The differences in the Docker filesystem 
implementation led to different ordering of the directory contents; this 
affected the {{FLINK_CLASSPATH}} ordering and generated very puzzling 
{{NoClassNotFoundException}} errors when running my Flink application.

This should be addressed by deterministically ordering JAR files added to the 
{{FLINK_CLASSPATH}}.

  was:
The Flink 1.2.0 {{config.sh}} script uses the following shell-script function 
to build the CLASSPATH variable from a listing of JAR files in the 
{{$FLINK_HOME/lib}} directory:

{code}
constructFlinkClassPath() {

while read -d '' -r jarfile ; do
if [[ $FLINK_CLASSPATH = "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)

echo $FLINK_CLASSPATH
}
{code}

The {{find}} command as it is specified will return files in directory-order, 
which is varies by OS and filesystem.

The inconsistent ordering caused problems for me when installing a Flink Docker 
image I built on a new machine with a newer version of Docker. The differences 
in the Docker filesystem implementation led to different ordering of the 
directory contents, which led to a different order of the CLASSPATH element and 
very puzzling {{ClassNotFoundException}} errors when running my application.

This should be addressed by adding some explicit ordering to the JAR files 
added to the CLASSPATH used by Flink.


> Add JARs to CLASSPATH deterministically
> ---
>
> Key: FLINK-6176
> URL: https://issues.apache.org/jira/browse/FLINK-6176
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The {{config.sh}} script uses the following shell-script function to build 
> the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the 
> {{$FLINK_LIB_DIR}} directory:
> {code}
> constructFlinkClassPath() {
> while read -d '' -r jarfile ; do
> if [[ $FLINK_CLASSPATH = "" ]]; then
> FLINK_CLASSPATH="$jarfile";
> else
> FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
> fi
> done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
> echo $FLINK_CLASSPATH
> }
> {code}
> The {{find}} command as specified will return files in directory-order, which 
> varies by OS and filesystem.
> The inconsistent ordering of directory contents caused problems for me when 
> installing a Flink Docker image onto new machine with a newer version of 
> Docker and different filesystem (UFS). The differences in the Docker 
> filesystem implementation led to different ordering of the directory 
> contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very 
> puzzling {{NoClassNotFoundException}} errors when running my Flink 
> application.
> This should be addressed by deterministically ordering JAR files added to the 
> {{FLINK_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6176) Add JARs to CLASSPATH deterministically

2017-03-23 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-6176:
---

 Summary: Add JARs to CLASSPATH deterministically
 Key: FLINK-6176
 URL: https://issues.apache.org/jira/browse/FLINK-6176
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Scott Kidder


The Flink 1.2.0 {{config.sh}} script uses the following shell-script function 
to build the CLASSPATH variable from a listing of JAR files in the 
{{$FLINK_HOME/lib}} directory:

{code}
constructFlinkClassPath() {

while read -d '' -r jarfile ; do
if [[ $FLINK_CLASSPATH = "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)

echo $FLINK_CLASSPATH
}
{code}

The {{find}} command as it is specified will return files in directory-order, 
which is varies by OS and filesystem.

The inconsistent ordering caused problems for me when installing a Flink Docker 
image I built on a new machine with a newer version of Docker. The differences 
in the Docker filesystem implementation led to different ordering of the 
directory contents, which led to a different order of the CLASSPATH element and 
very puzzling {{ClassNotFoundException}} errors when running my application.

This should be addressed by adding some explicit ordering to the JAR files 
added to the CLASSPATH used by Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5946) Kinesis Producer uses KPL that orphans threads that consume 100% CPU

2017-03-01 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5946:
---

 Summary: Kinesis Producer uses KPL that orphans threads that 
consume 100% CPU
 Key: FLINK-5946
 URL: https://issues.apache.org/jira/browse/FLINK-5946
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.2.0
Reporter: Scott Kidder


It's possible for the Amazon Kinesis Producer Library (KPL) to leave orphaned 
threads running after the producer has been instructed to shutdown via the 
`destroy()` method. These threads run in a very tight infinite loop that can 
push CPU usage to 100%. I've seen this happen on several occasions, though it 
does not happen all of the time. Once these threads are orphaned, the only 
solution to bring CPU utilization back down is to restart the Flink Task 
Manager.

When a KPL producer is instantiated, it creates several threads: one to execute 
and monitor the native sender process, and two threads to monitor the process' 
stdout and stderr output. It's possible for the process-monitor thread to stop 
in such a way that leaves the output monitor threads orphaned.

I've submitted a Github issue and pull-request against the KPL project:
https://github.com/awslabs/amazon-kinesis-producer/issues/93
https://github.com/awslabs/amazon-kinesis-producer/pull/94

This issue is rooted in the Amazon Kinesis Producer Library (KPL) that the 
Flink Kinesis streaming connector depends upon. It ought to be fixed in the 
KPL, but I want to document it on the Flink project. The Flink KPL dependency 
should be updated once the KPL has been fixed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-24 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15883238#comment-15883238
 ] 

Scott Kidder commented on FLINK-5898:
-

Thanks Gordon & Rob, I've opened a pull-request against the KPL:
https://github.com/awslabs/amazon-kinesis-producer/pull/92

Testing with a 4-core Flink cluster (2 task-managers with 2 cores each) looks 
good; the separate lock-file prevents the race-condition that can occur when 
reading/writing the native binary. I'll update this issue if the situation 
changes or the KPL pull-request is accepted.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881967#comment-15881967
 ] 

Scott Kidder commented on FLINK-5898:
-

Hi [~tzulitai],

I'll look into fixing this in the KPL. I noticed that the method that installs 
the KPL binary uses a shared lock, which would allow multiple processes to 
obtain overlapping locks and write to the same file simultaneously:
https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java#L815

I'll try patching the KPL to obtain an exclusive lock. I'll also file a Github 
issue against the KPL to see what the KPL authors think.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5898:
---

 Summary: Race-Condition with Amazon Kinesis KPL
 Key: FLINK-5898
 URL: https://issues.apache.org/jira/browse/FLINK-5898
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.2.0
Reporter: Scott Kidder


The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer Library 
(KPL) to send messages to Kinesis streams. The KPL relies on a native binary 
client to send messages to achieve better performance.

When a Kinesis Producer is instantiated, the KPL will extract the native binary 
to a sub-directory of `/tmp` (or whatever the platform-specific temporary 
directory happens to be).

The KPL tries to prevent multiple processes from extracting the binary at the 
same time by wrapping the operation in a mutex. Unfortunately, this does not 
prevent multiple Flink cores from trying to perform this operation at the same 
time. If two or more processes attempt to do this at the same time, then the 
native binary in /tmp will be corrupted.

The authors of the KPL are aware of this possibility and suggest that users of 
the KPL  not do that ... (sigh):
https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897

I encountered this in my production environment when bringing up a new Flink 
task-manager with multiple cores and restoring from an earlier savepoint, 
resulting in the instantiation of a KPL client on each core at roughly the same 
time.

A stack-trace follows:

{noformat}
java.lang.RuntimeException: Could not copy native binaries to temp directory 
/tmp/amazon-kinesis-producer-native-binaries
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.SecurityException: The contents of the binary 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
 is not what it's expected to be.
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
... 8 more
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-26 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-5648.
---
Resolution: Not A Problem

Thank you for trying to reproduce this issue, Robert. I found the sequence that 
leads to the problem; it seems unlikely to be a blocker.

I had a browser window open to a 1.2.0 rc0 Job Manager installation. I then 
upgraded that installation to 1.2.0 rc2 without refreshing the browser window. 
I suspect that the changes introduced in FLINK-5382 invalidated earlier 
assumptions about how the task-manager identifier is populated in the logs URL.

Simply refreshing the browser window after performing the upgrade to 1.2.0 rc2 
led to the logs link being generated correctly.

> Task Manager ID missing from logs link in Job Manager UI
> 
>
> Key: FLINK-5648
> URL: https://issues.apache.org/jira/browse/FLINK-5648
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> This appears to be a regression introduced in Flink 1.2.0 rc2. I've been 
> using 1.2.0 rc0 without this issue being present.
> The link from the Job Manager to download logs for a specific Task Manager 
> instance does not include the ID of the Task Manager. The following 
> screenshot shows the link:
> !http://imgur.com/dLhxALT.png!
> The following exception appears in the Job Manager logs after trying to 
> retrieve the logs for a Task Manager (without the ID of the Task Manager 
> given):
> {noformat}
> 2017-01-25 23:34:44,915 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - 
> Fetching TaskManager log failed.
> java.lang.IllegalArgumentException: Argument bytes must by an array of 16 
> bytes
>   at org.apache.flink.util.AbstractID.(AbstractID.java:63)
>   at 
> org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>   at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>   at 
> 

[jira] [Closed] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-26 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-5649.
---
Resolution: Fixed

Yes, we can close the issue since there's no expectation of compatibility 
across release-candidates.

> KryoException when starting job from Flink 1.2.0 rc0 savepoint
> --
>
> Key: FLINK-5649
> URL: https://issues.apache.org/jira/browse/FLINK-5649
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
> encountered the following error on Flink 1.2.0 rc2, leading to the job being 
> cancelled:
> {noformat}
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
> class ID: 92
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
>   ... 6 more
> {noformat}
> Submitting the same job without a savepoint works fine (except that there's 
> no state, of course).
> Might be related to FLINK-5484 pull-request 
> https://github.com/apache/flink/pull/3152



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-25 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5649:

Description: 
I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error on Flink 1.2.0 rc2, leading to the job being 
cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 
https://github.com/apache/flink/pull/3152

  was:
I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error, leading to the job being cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 

[jira] [Created] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-25 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5649:
---

 Summary: KryoException when starting job from Flink 1.2.0 rc0 
savepoint
 Key: FLINK-5649
 URL: https://issues.apache.org/jira/browse/FLINK-5649
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Scott Kidder


I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error, leading to the job being cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 
https://github.com/apache/flink/pull/3152



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838860#comment-15838860
 ] 

Scott Kidder commented on FLINK-5648:
-

May have been introduced in FLINK-5382 pull-request 
https://github.com/apache/flink/pull/3055

> Task Manager ID missing from logs link in Job Manager UI
> 
>
> Key: FLINK-5648
> URL: https://issues.apache.org/jira/browse/FLINK-5648
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> This appears to be a regression introduced in Flink 1.2.0 rc2. I've been 
> using 1.2.0 rc0 without this issue being present.
> The link from the Job Manager to download logs for a specific Task Manager 
> instance does not include the ID of the Task Manager. The following 
> screenshot shows the link:
> !http://imgur.com/dLhxALT.png!
> The following exception appears in the Job Manager logs after trying to 
> retrieve the logs for a Task Manager (without the ID of the Task Manager 
> given):
> {noformat}
> 2017-01-25 23:34:44,915 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - 
> Fetching TaskManager log failed.
> java.lang.IllegalArgumentException: Argument bytes must by an array of 16 
> bytes
>   at org.apache.flink.util.AbstractID.(AbstractID.java:63)
>   at 
> org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>   at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

[jira] [Updated] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5648:

Description: 
This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 
retrieve the logs for a Task Manager (without the ID of the Task Manager given):

{noformat}
2017-01-25 23:34:44,915 ERROR 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - Fetching 
TaskManager log failed.
java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes
at org.apache.flink.util.AbstractID.(AbstractID.java:63)
at 
org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
at 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
{noformat}

  was:
This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 

[jira] [Created] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5648:
---

 Summary: Task Manager ID missing from logs link in Job Manager UI
 Key: FLINK-5648
 URL: https://issues.apache.org/jira/browse/FLINK-5648
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.0
Reporter: Scott Kidder


This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 
retrieve the logs for a Task Manager (without the ID of the Task Manager given):

{{
2017-01-25 23:34:44,915 ERROR 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - Fetching 
TaskManager log failed.
java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes
at org.apache.flink.util.AbstractID.(AbstractID.java:63)
at 
org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
at 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4651) Re-register processing time timers at the WindowOperator upon recovery.

2017-01-09 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812288#comment-15812288
 ] 

Scott Kidder commented on FLINK-4651:
-

I can confirm that this issue appears to be fixed in the release-1.2.0-rc0 
branch (commit f3c59cedae7a508825d8032a8aa9f5af6d177555).

> Re-register processing time timers at the WindowOperator upon recovery.
> ---
>
> Key: FLINK-4651
> URL: https://issues.apache.org/jira/browse/FLINK-4651
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>  Labels: windows
> Fix For: 1.2.0, 1.1.5
>
>
> Currently the {{WindowOperator}} checkpoints the processing time timers, but 
> upon recovery it does not re-registers them with the {{TimeServiceProvider}}. 
> To actually reprocess them it relies on another element that will come and 
> register a new timer for a future point in time. Although this is a realistic 
> assumption in long running jobs, we can remove this assumption by 
> re-registering the restored timers with the {{TimeServiceProvider}} in the 
> {{open()}} method of the {{WindowOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2016-12-16 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5355:

Affects Version/s: (was: 2.0.0)
   1.2.0

> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2016-12-16 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5355:

Description: 
My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.

  was:
My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at 

[jira] [Updated] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2016-12-16 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5355:

Affects Version/s: 2.0.0

> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 2.0.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException, which extends AmazonKinesisException: perform 
> an exponential backoff and retry a finite number of times before throwing an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2016-12-16 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5355:
---

 Summary: Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector
 Key: FLINK-5355
 URL: https://issues.apache.org/jira/browse/FLINK-5355
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.1.3
Reporter: Scott Kidder
Assignee: Scott Kidder


My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException, which extends AmazonKinesisException: perform 
an exponential backoff and retry a finite number of times before throwing an 
exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4622) CLI help message should include 'savepoint' action

2016-09-15 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4622:
---

 Summary: CLI help message should include 'savepoint' action
 Key: FLINK-4622
 URL: https://issues.apache.org/jira/browse/FLINK-4622
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.1.2
Reporter: Scott Kidder
Assignee: Scott Kidder
Priority: Trivial


The Flink CLI help message should include the 'savepoint' action in the list of 
available actions. It currently looks like:

{code}
bash-4.3# flink foo
"foo" is not a valid action.

Valid actions are "run", "list", "info", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3442) Expose savepoint button on web ui

2016-09-15 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493485#comment-15493485
 ] 

Scott Kidder commented on FLINK-3442:
-

This is duplicated by FLINK-4336

> Expose savepoint button on web ui
> -
>
> Key: FLINK-3442
> URL: https://issues.apache.org/jira/browse/FLINK-3442
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Gyula Fora
>Priority: Minor
>
> Similarly to Cancel there should be a Savepoint button to initiate a 
> savepoint for streaming jobs.
> These 2 buttons should NOT be next to each other :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4536) Possible thread leak in Task Manager

2016-09-02 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-4536.
---
Resolution: Not A Bug

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  

[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456481#comment-15456481
 ] 

Scott Kidder commented on FLINK-4536:
-

On second thought, I'll leave the issue open and allow the Flink team to decide 
whether to update the version of the AWS SDK and KCL referenced in the Kinesis 
connector pom.xml.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2016-08-30 14:52:50,544 

[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456462#comment-15456462
 ] 

Scott Kidder edited comment on FLINK-4536 at 9/1/16 8:04 PM:
-

Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {{close()}} function in the InfluxDBSink to ensure that 
the OkHttpClient used by the InfluxDB client is shutdown. The documentation for 
the OkHttp library suggests that this isn't necessary, but, hey, I'll do it 
anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.


was (Author: skidder):
Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {code}code(){code} function in the InfluxDBSink to 
ensure that the {code}OkHttpClient{code} used by the InfluxDB client is 
shutdown. The documentation for the OkHttp library suggests that this isn't 
necessary, but, hey, I'll do it anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> 

[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456462#comment-15456462
 ] 

Scott Kidder commented on FLINK-4536:
-

Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {code}code(){code} function in the InfluxDBSink to 
ensure that the {code}OkHttpClient{code} used by the InfluxDB client is 
shutdown. The documentation for the OkHttp library suggests that this isn't 
necessary, but, hey, I'll do it anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO 

[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-08-31 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453388#comment-15453388
 ] 

Scott Kidder commented on FLINK-4536:
-

Here's a chart of the thread count for each Task Manager during the time-period 
that corresponds to the jstack output I provided a moment ago:
http://imgur.com/a/iBoZP

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2016-08-30 14:52:50,544 INFO  
> 

[jira] [Created] (FLINK-4536) Possible thread leak in Task Manager

2016-08-30 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4536:
---

 Summary: Possible thread leak in Task Manager
 Key: FLINK-4536
 URL: https://issues.apache.org/jira/browse/FLINK-4536
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.1.0
Reporter: Scott Kidder


Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
Job Manager
2 x Task Manager (2 CPU cores on each Task Manager)

I've also updated the Kinesis source to use the latest AWS Java SDK, release 
1.11.29.

I've got a single Flink application using all 4 slots. It consumes from a 
Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
parallelism of 2 as a workaround for FLINK-4341.

Occasionally the Kinesis consumer fails because of provisioned-throughput 
limits being hit. The application automatically restarts, and resumes 
processing with the checkpoint stored on the Job Manager with no outward 
indication of problems.

I recently enabled the StatsD metrics reporter in Flink and noticed that the 
number of threads running on each Task Manager increases by about 20 threads 
each time the application restarts. Over the course of a day the application 
might hit provisioned-throughput limits 20 times or so (this is not fully 
production yet, so hitting these limits is acceptable for now). But the number 
of threads continues to grow unbounded with no increase in workload on the Task 
Managers.

The following link includes charts for the overall Flink cluster performance & 
Task Manager JVM threads over the course of 12 hours:
http://imgur.com/a/K59hz

Each decrease and subsequent spike in threads corresponds to the job being 
restarted due to an AWS Kinesis source error.

Here are the logs from one of the Task Manager instances on startup:

{code}
2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Starting TaskManager (Version: 1.1.1, Rev:61bfb36, 
Date:09.08.2016 @ 12:09:08 UTC)
2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Current user: root
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
1.8/25.92-b14
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Maximum heap size: 2048 MiBytes
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Hadoop version: 2.7.2
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM Options:
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:+UseG1GC
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xms2048M
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xmx2048M
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:MaxDirectMemorySize=8388607T
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Program Arguments:
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- --configDir
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- /usr/local/flink-1.1.1/conf
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Classpath: 
/usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
2016-08-30 14:52:50,544 INFO  

[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly

2016-08-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428584#comment-15428584
 ] 

Scott Kidder commented on FLINK-4341:
-

FYI, I've confirmed that explicitly setting the parallelism on the Kinesis 
source to equal the number of shards (e.g. 2) allows the remaining application 
stages to run at larger parallelism values (e.g. 4) successfully.

> Kinesis connector does not emit maximum watermark properly
> --
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Scott Kidder
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.2.0, 1.1.2
>
>
> **Prevously reported as "Checkpoint state size grows unbounded when task 
> parallelism not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum 
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider 
> using a different state backend, like the File System State backend.
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> 

[jira] [Updated] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-17 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-4341:

Affects Version/s: 1.1.1

> Checkpoint state size grows unbounded when task parallelism not uniform
> ---
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Scott Kidder
>Assignee: Robert Metzger
>Priority: Critical
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum 
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider 
> using a different state backend, like the File System State backend.
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> suspect there was a regression introduced relating to assumptions about the 
> number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a 
> value ranging from 1-4). This is currently preventing me from using all 
> available Task Manager slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-17 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425408#comment-15425408
 ] 

Scott Kidder commented on FLINK-4341:
-

I repeated my tests using Flink 1.1.1 and continue to have problems with the 
checkpoint growing in size until it reaches the AKKA message size limit (I 
override it to allow up to 30MB) or the heap-memory is exhausted on the task 
managers. It's as though old data that's outside the window continues to occupy 
memory.

> Checkpoint state size grows unbounded when task parallelism not uniform
> ---
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Scott Kidder
>Assignee: Robert Metzger
>Priority: Critical
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum 
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider 
> using a different state backend, like the File System State backend.
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> suspect there was a regression 

[jira] [Commented] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-09 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414599#comment-15414599
 ] 

Scott Kidder commented on FLINK-4341:
-

Unfortunately I don't have the time to develop a test application to 
demonstrate this issue without Kinesis. It might be reproducible with Kafka 
when the number of shards (or the Kafka equivalent) is less than the 
parallelism specified on the job.

Also, I noticed that the 1.1.0 binaries available for download were created on 
August 4 and don't include the latest commits on the release-1.1 branch in Git. 
Do you know when they'll be updated? I deploy Flink as a Docker container and 
use the Flink tar-gzip binary to build the Docker image.



> Checkpoint state size grows unbounded when task parallelism not uniform
> ---
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>Priority: Critical
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at 
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of 
> the state is larger than the maximum permitted memory-backed state. 
> Size=12105407 , maxSize=5242880 . Consider using a different state backend, 
> like the File System State backend. at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>  at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>  ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> 

[jira] [Commented] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-09 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414301#comment-15414301
 ] 

Scott Kidder commented on FLINK-4341:
-

I also noticed that when checkpointing is enabled and I'm using a parallelism 
of 2 the processing speed is extremely slow compared to that of Flink 18995c8. 
I disabled checkpointing altogether and the speed returned to previous levels.

I'm currently building Flink from source to pull in hotfixes added to the 
release-1.1 branch since commit 45f7825. I'll update this issue with my 
findings.

> Checkpoint state size grows unbounded when task parallelism not uniform
> ---
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at 
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of 
> the state is larger than the maximum permitted memory-backed state. 
> Size=12105407 , maxSize=5242880 . Consider using a different state backend, 
> like the File System State backend. at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>  at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>  ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> suspect there was a regression introduced relating to 

[jira] [Created] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-09 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4341:
---

 Summary: Checkpoint state size grows unbounded when task 
parallelism not uniform
 Key: FLINK-4341
 URL: https://issues.apache.org/jira/browse/FLINK-4341
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.0
Reporter: Scott Kidder


This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
expected.  This issue was introduced somewhere between those commits.

I've got a Flink application that uses the Kinesis Stream Consumer to read from 
a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, 
providing a total of 4 slots.  When running the application with a parallelism 
of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for 
subsequent tasks that process the Kinesis stream data. I use an in-memory store 
for checkpoint data.

Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
states were growing unbounded when running with a parallelism of 4, checkpoint 
interval of 10 seconds:

{code}
ID  State Size
1   11.3 MB
220.9 MB
3   30.6 MB
4   41.4 MB
5   52.6 MB
6   62.5 MB
7   71.5 MB
8   83.3 MB
9   93.5 MB
{code}

The first 4 checkpoints generally succeed, but then fail with an exception like 
the following:

{code}
java.lang.RuntimeException: Error triggering a checkpoint as the result of 
receiving checkpoint barrier at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
 at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
 at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at 
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of 
the state is larger than the maximum permitted memory-backed state. 
Size=12105407 , maxSize=5242880 . Consider using a different state backend, 
like the File System State backend. at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
 at 
org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
 ... 8 more
{code}

Or:

{code}
2016-08-09 17:44:43,626 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
checkpointed state to task Fold: property_id, player -> 10-minute 
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- Transient 
association error (association remains live) 
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max allowed 
size 10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825 
bytes.
{code}

This can be fixed by simply submitting the job with a parallelism of 2. I 
suspect there was a regression introduced relating to assumptions about the 
number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a 
value ranging from 1-4). This is currently preventing me from using all 
available Task Manager slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config

2016-07-11 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4197:
---

 Summary: Allow Kinesis Endpoint to be Overridden via Config
 Key: FLINK-4197
 URL: https://issues.apache.org/jira/browse/FLINK-4197
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.0.3
Reporter: Scott Kidder
Priority: Minor
 Fix For: 1.0.4


I perform local testing of my application stack with Flink configured as a 
consumer on a Kinesis stream provided by Kinesalite, an implementation of 
Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
refer to my local Kinesalite server rather than reference the real AWS 
endpoint. I'd like to add a configuration property to the Kinesis streaming 
connector that allows the AWS endpoint to be specified explicitly.

This should be a fairly small change and provide a lot of flexibility to people 
looking to integrate Flink with Kinesis in a non-production setup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)