We don't have to include it in 1.1.4 since Meter's do not exist in 1.1; my bad for tagging it in JIRA for 1.1.4.

On 05.12.2016 14:18, Ufuk Celebi wrote:
Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:
Hello Daniel,
I'm afraid you stumbled upon a bug in Flink. Meters were not properly
cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
I've opened a JIRA
and will open a PR soon.
Thank your for reporting this issue. Regards,
Chesnay
On 05.12.2016 12:05, Stefan Richter wrote:
Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan

Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)

Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in
RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in
RichFilterFunction / RichMapFunction with open and close methods, in
order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction /
RichMapFunction with open and close methods in order to open and
close the client.

Next thing I cancelled all the streaming jobs @5/12/2016 and checked
the threads and the JVM non-heap usage.

JVM non-heap usage reaches 3GB, threads go down, but some still
linger around and they are the following.

*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
used ) *

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None

*
*

*tend -- 432**( Aerospike Client Thread )*


"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
So I have 1220 threads that I believe that sould be dead and not
running, since I have no jobs running at all.

And the JVM Non-HEAP usage doesn't decreases at all, after removing
every job.


Why the hell metrics grow to no end ?

I am using the following libs for metrics :

- metrics-graphite-3.1.0.jar

- metrics-core-3.1.0.jar

- flink-metrics-dropwizard-1.1.3.jar

- flink-metrics-graphite-1.1.3.jar

And the config for reporter is :

metrics.reporters: graphite
metrics.reporter.graphite.class:
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: CARBONRELAYHOST
metrics.reporter.graphite.port: 2003


Shouldn't also the Aerospike Client be closed ? Or am I missing
something, or doing something wrong ?


Sorry for the long post.

Best Regards,

Daniel Santos


On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
Hey Daniel!

Thanks for reporting this. Unbounded growth of non-heap memory is not expected.
What kind of Threads are you seeing being spawned/lingering around?
As a first step, could you try to disable checkpointing and see how it behaves 
afterwards?

– Ufuk

On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net) wrote:
Hello,

Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
HttpClient as a Sink, also Kafka as Sink.
So it's possible that the state backend is the culprit?

Curious thing is even when no jobs are running streaming or otherwise,
the JVM Non-HEAP stays the same.
Which I find it odd.

Another curious thing is that it's proportional to an increase of JVM
thread's number.
Whenever there are more JVM threads running there is also more JVM
Non-HEAP being used, which makes sense.
But threads stick around never decreasing, too, likewise JVM Non-HEAP
memory.

These observations described are based on what flink's metrics are being
sent and recorded to our graphite's system.

Best Regards,

Daniel Santos

On 11/29/2016 04:04 PM, Cliff Resnick wrote:
Are you using the RocksDB backend in native mode? If so then the
off-heap memory may be there.

On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:

i have the same problem,but i put the flink job into yarn.
but i put the job into yarn on the computer 22,and the job can
success run,and the jobmanager is 79 and taskmanager is 69,they
three different compu345ter,
however,on computer 22,the pid=3463,which is the job that put into
yarn,is have 2.3g memory,15% of total,
the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
-ytm 1024 ....
why in conputer 22,has occupy so much momory?the job is running
computer 79 and computer 69.
What would be the possible causes of such behavior ?
Best Regards,
----- 原始邮件 -----
发件人:Daniel Santos > > >
收件人:user@flink.apache.org
主题:JVM Non Heap Memory
日期:2016年11月29日 22点26分


Hello,
Is it common to have high usage of Non-Heap in JVM ?
I am running flink in stand-alone cluster and in docker, with each
docker bieng capped at 6G of memory.
I have been struggling to keep memory usage in check.
The non-heap increases to no end. It start with just 100MB of
usage and
after a day it reaches to 1,3GB.
Then evetually reaches to 2GB and then eventually the docker is
killed
because it has reached the memory limit.
My configuration for each flink task manager is the following :
----------- flink-conf.yaml --------------
taskmanager.heap.mb: 3072
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false
taskmanager.network.numberOfBuffers: 12500
taskmanager.memory.off-heap: false
---------------------------------------------
What would be the possible causes of such behavior ?
Best Regards,
Daniel Santos





Reply via email to