[jira] [Created] (FLINK-9127) Filesystem State Backend logged incorrectly

2018-04-03 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-9127:
---

 Summary: Filesystem State Backend logged incorrectly
 Key: FLINK-9127
 URL: https://issues.apache.org/jira/browse/FLINK-9127
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2, 1.3.2
Reporter: Scott Kidder


When using a filesystem backend, the 
'[StateBackendLoader|https://github.com/apache/flink/blob/1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L123]'
 class produces a log message stating: "State backend is set to heap memory". 

Example:

{{2018-04-04 00:45:49,591 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend 
is set to heap memory (checkpoints to filesystem 
"hdfs://hdfs:8020/flink/checkpoints")}}

It looks like this resulted from some copy-pasta of the previous case-statement 
that matches on the memory backend. This bug is also present in earlier 
releases (1.3.2, 1.4.0) of Flink in the 'AbstractStateBackend' class.

This log statement should be corrected to indicate that a filesystem backend is 
in use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-7022) Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7022:
---

 Summary: Flink Job Manager Scheduler & Web Frontend out of sync 
when Zookeeper is unavailable on startup
 Key: FLINK-7022
 URL: https://issues.apache.org/jira/browse/FLINK-7022
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Job Manager web frontend is permanently unavailable if one or more 
Zookeeper nodes are unresolvable during startup. The job scheduler eventually 
recovers and assigns jobs to task managers, but the web frontend continues to 
respond with an HTTP 503 and the following message:
{noformat}Service temporarily unavailable due to an ongoing leader election. 
Please refresh.{noformat}

h2. Expected Behavior
Once Flink is able to interact with Zookeeper successfully, all aspects of the 
Job Manager (job scheduling & the web frontend) should be available.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.host, localhost
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.port, 8125
2017-06-27 15:38:47,162 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.interval, 10 SECONDS
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend, filesystem
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend.fs.checkpointdir, 
hdfs://hdfs:8020/flink/checkpoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.savepoints.dir, hdfs://hdfs:8020/flink/savepoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.mode, zookeeper
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.quorum, 
zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.storageDir, 
hdfs://hdfs:8020/flink/recovery
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.jobmanager.port, 6123
2017-06-27 15:38:47,257 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: blob.server.port, 41479
2017-06-27 15:38:47,357 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 'recovery.mode' instead 
of proper key 'high-availability'
2017-06-27 15:38:47,366 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager with high-availability
2017-06-27 15:38:47,366 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 
'recovery.jobmanager.port' instead of proper key 
'high-availability.jobmanager.port'
2017-06-27 15:38:47,452 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager on flink:6123 with execution mode CLUSTER
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, flink
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration  

[jira] [Created] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7021:
---

 Summary: Flink Task Manager hangs on startup if one Zookeeper node 
is unresolvable
 Key: FLINK-7021
 URL: https://issues.apache.org/jira/browse/FLINK-7021
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
the Zookeeper connection string is unresolvable.

h2. Expected Behavior
Flink should retry name resolution & connection to Zookeeper nodes with 
exponential back-off.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:51,713 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Using configured hostname/address for TaskManager: 10.2.45.11
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor system at 10.2.45.11:6122.
2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2017-06-27 15:38:53,079 INFO  Remoting  
- Starting remoting
2017-06-27 15:38:53,573 INFO  Remoting  
- Remoting started; listening on addresses 
:[akka.tcp://flink@10.2.45.11:6122]
2017-06-27 15:38:53,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor
2017-06-27 15:38:53,660 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
[server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
(manual), number of client threads: 2 (manual), server connect backlog: 0 (use 
Netty's default), client connect timeout (sec): 120, send/receive buffer size 
(bytes): 0 (use Netty's default)]
2017-06-27 15:38:53,682 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 1 ms
2017-06-27 15:38:53,688 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file 
directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
2017-06-27 15:38:54,071 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 MB 
for network buffer pool (number of memory segments: 3095, bytes per segment: 
32768).
2017-06-27 15:38:54,564 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
network environment and its components.
2017-06-27 15:38:54,576 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
initialization (took 4 ms).
2017-06-27 15:38:54,677 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
2017-06-27 15:38:54,981 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
managed memory to 0.7 of the currently free heap space (612 MB), memory will be 
allocated lazily.
2017-06-27 15:38:55,050 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
files.
2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Configuring StatsDReporter with {interval=10 SECONDS, port=8125, 
host=localhost, class=org.apache.flink.metrics.statsd.StatsDReporter}.
2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter
- Configured StatsDReporter with {host:localhost, port:8125}
2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Periodically reporting metrics in intervals of 10 SECONDS for 
reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
2017-06-27 15:38:55,175 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-e4c5bcc5-7513-40d9-a665-0d33c80a36ba
2017-06-27 15:38:55,187 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses 

[jira] [Created] (FLINK-6176) Add JARs to CLASSPATH deterministically

2017-03-23 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-6176:
---

 Summary: Add JARs to CLASSPATH deterministically
 Key: FLINK-6176
 URL: https://issues.apache.org/jira/browse/FLINK-6176
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Scott Kidder


The Flink 1.2.0 {{config.sh}} script uses the following shell-script function 
to build the CLASSPATH variable from a listing of JAR files in the 
{{$FLINK_HOME/lib}} directory:

{code}
constructFlinkClassPath() {

while read -d '' -r jarfile ; do
if [[ $FLINK_CLASSPATH = "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)

echo $FLINK_CLASSPATH
}
{code}

The {{find}} command as it is specified will return files in directory-order, 
which is varies by OS and filesystem.

The inconsistent ordering caused problems for me when installing a Flink Docker 
image I built on a new machine with a newer version of Docker. The differences 
in the Docker filesystem implementation led to different ordering of the 
directory contents, which led to a different order of the CLASSPATH element and 
very puzzling {{ClassNotFoundException}} errors when running my application.

This should be addressed by adding some explicit ordering to the JAR files 
added to the CLASSPATH used by Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5946) Kinesis Producer uses KPL that orphans threads that consume 100% CPU

2017-03-01 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5946:
---

 Summary: Kinesis Producer uses KPL that orphans threads that 
consume 100% CPU
 Key: FLINK-5946
 URL: https://issues.apache.org/jira/browse/FLINK-5946
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.2.0
Reporter: Scott Kidder


It's possible for the Amazon Kinesis Producer Library (KPL) to leave orphaned 
threads running after the producer has been instructed to shutdown via the 
`destroy()` method. These threads run in a very tight infinite loop that can 
push CPU usage to 100%. I've seen this happen on several occasions, though it 
does not happen all of the time. Once these threads are orphaned, the only 
solution to bring CPU utilization back down is to restart the Flink Task 
Manager.

When a KPL producer is instantiated, it creates several threads: one to execute 
and monitor the native sender process, and two threads to monitor the process' 
stdout and stderr output. It's possible for the process-monitor thread to stop 
in such a way that leaves the output monitor threads orphaned.

I've submitted a Github issue and pull-request against the KPL project:
https://github.com/awslabs/amazon-kinesis-producer/issues/93
https://github.com/awslabs/amazon-kinesis-producer/pull/94

This issue is rooted in the Amazon Kinesis Producer Library (KPL) that the 
Flink Kinesis streaming connector depends upon. It ought to be fixed in the 
KPL, but I want to document it on the Flink project. The Flink KPL dependency 
should be updated once the KPL has been fixed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-25 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5649:
---

 Summary: KryoException when starting job from Flink 1.2.0 rc0 
savepoint
 Key: FLINK-5649
 URL: https://issues.apache.org/jira/browse/FLINK-5649
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Scott Kidder


I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error, leading to the job being cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 
https://github.com/apache/flink/pull/3152



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2016-12-16 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5355:
---

 Summary: Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector
 Key: FLINK-5355
 URL: https://issues.apache.org/jira/browse/FLINK-5355
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.1.3
Reporter: Scott Kidder
Assignee: Scott Kidder


My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException, which extends AmazonKinesisException: perform 
an exponential backoff and retry a finite number of times before throwing an 
exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4622) CLI help message should include 'savepoint' action

2016-09-15 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4622:
---

 Summary: CLI help message should include 'savepoint' action
 Key: FLINK-4622
 URL: https://issues.apache.org/jira/browse/FLINK-4622
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.1.2
Reporter: Scott Kidder
Assignee: Scott Kidder
Priority: Trivial


The Flink CLI help message should include the 'savepoint' action in the list of 
available actions. It currently looks like:

{code}
bash-4.3# flink foo
"foo" is not a valid action.

Valid actions are "run", "list", "info", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4536) Possible thread leak in Task Manager

2016-08-30 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4536:
---

 Summary: Possible thread leak in Task Manager
 Key: FLINK-4536
 URL: https://issues.apache.org/jira/browse/FLINK-4536
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.1.0
Reporter: Scott Kidder


Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
Job Manager
2 x Task Manager (2 CPU cores on each Task Manager)

I've also updated the Kinesis source to use the latest AWS Java SDK, release 
1.11.29.

I've got a single Flink application using all 4 slots. It consumes from a 
Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
parallelism of 2 as a workaround for FLINK-4341.

Occasionally the Kinesis consumer fails because of provisioned-throughput 
limits being hit. The application automatically restarts, and resumes 
processing with the checkpoint stored on the Job Manager with no outward 
indication of problems.

I recently enabled the StatsD metrics reporter in Flink and noticed that the 
number of threads running on each Task Manager increases by about 20 threads 
each time the application restarts. Over the course of a day the application 
might hit provisioned-throughput limits 20 times or so (this is not fully 
production yet, so hitting these limits is acceptable for now). But the number 
of threads continues to grow unbounded with no increase in workload on the Task 
Managers.

The following link includes charts for the overall Flink cluster performance & 
Task Manager JVM threads over the course of 12 hours:
http://imgur.com/a/K59hz

Each decrease and subsequent spike in threads corresponds to the job being 
restarted due to an AWS Kinesis source error.

Here are the logs from one of the Task Manager instances on startup:

{code}
2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Starting TaskManager (Version: 1.1.1, Rev:61bfb36, 
Date:09.08.2016 @ 12:09:08 UTC)
2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Current user: root
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
1.8/25.92-b14
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Maximum heap size: 2048 MiBytes
2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Hadoop version: 2.7.2
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM Options:
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:+UseG1GC
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xms2048M
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xmx2048M
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:MaxDirectMemorySize=8388607T
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Program Arguments:
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- --configDir
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- /usr/local/flink-1.1.1/conf
2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Classpath: 
/usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
2016-08-30 14:52:50,544 INFO  

[jira] [Created] (FLINK-4341) Checkpoint state size grows unbounded when task parallelism not uniform

2016-08-09 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4341:
---

 Summary: Checkpoint state size grows unbounded when task 
parallelism not uniform
 Key: FLINK-4341
 URL: https://issues.apache.org/jira/browse/FLINK-4341
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.0
Reporter: Scott Kidder


This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
expected.  This issue was introduced somewhere between those commits.

I've got a Flink application that uses the Kinesis Stream Consumer to read from 
a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, 
providing a total of 4 slots.  When running the application with a parallelism 
of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for 
subsequent tasks that process the Kinesis stream data. I use an in-memory store 
for checkpoint data.

Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
states were growing unbounded when running with a parallelism of 4, checkpoint 
interval of 10 seconds:

{code}
ID  State Size
1   11.3 MB
220.9 MB
3   30.6 MB
4   41.4 MB
5   52.6 MB
6   62.5 MB
7   71.5 MB
8   83.3 MB
9   93.5 MB
{code}

The first 4 checkpoints generally succeed, but then fail with an exception like 
the following:

{code}
java.lang.RuntimeException: Error triggering a checkpoint as the result of 
receiving checkpoint barrier at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
 at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
 at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at 
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of 
the state is larger than the maximum permitted memory-backed state. 
Size=12105407 , maxSize=5242880 . Consider using a different state backend, 
like the File System State backend. at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
 at 
org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
 ... 8 more
{code}

Or:

{code}
2016-08-09 17:44:43,626 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
checkpointed state to task Fold: property_id, player -> 10-minute 
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- Transient 
association error (association remains live) 
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max allowed 
size 10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825 
bytes.
{code}

This can be fixed by simply submitting the job with a parallelism of 2. I 
suspect there was a regression introduced relating to assumptions about the 
number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a 
value ranging from 1-4). This is currently preventing me from using all 
available Task Manager slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config

2016-07-11 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4197:
---

 Summary: Allow Kinesis Endpoint to be Overridden via Config
 Key: FLINK-4197
 URL: https://issues.apache.org/jira/browse/FLINK-4197
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.0.3
Reporter: Scott Kidder
Priority: Minor
 Fix For: 1.0.4


I perform local testing of my application stack with Flink configured as a 
consumer on a Kinesis stream provided by Kinesalite, an implementation of 
Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
refer to my local Kinesalite server rather than reference the real AWS 
endpoint. I'd like to add a configuration property to the Kinesis streaming 
connector that allows the AWS endpoint to be specified explicitly.

This should be a fairly small change and provide a lot of flexibility to people 
looking to integrate Flink with Kinesis in a non-production setup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)