[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-24 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022593#comment-16022593
 ] 

Andrey commented on FLINK-6613:
---

Hey!

Looks like "fetch.max.bytes" option is what we need. I will try it.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6613:


+1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as 
much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be 
processed, another awaiting to be added to {{Handover}}.

Regarding what you expect:
"2) Read next batch of messages only when previous batch processed" --> this is 
already what is happening, with only a size 1 buffer in the {{Handover}}. Also, 
I don't think it will solve the root cause of whats causing your OOM.
"1) KafkaConsumerThread read messages with total size ~1G." --> as Robert 
mentioned, you should be able to just directly configure the Kafka client for 
that, and will likely solve your problem.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-23 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021747#comment-16021747
 ] 

Robert Metzger commented on FLINK-6613:
---

There are some options in the Kafka consumer (that you can just pass using the 
configuration properties of Flink's consumer) that influence the fetch behavior.
For example {{max.poll.records}} limits the number of records per poll, 
{{fetch.max.bytes}} the bytes per poll. I would try to play around with these 
settings to see if it solves the problem.

Flink itself won't buffer any records directly on the heap. The only thing that 
buffers in Flink is the network stack, but that is strictly bounded by a 
configuration parameter.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-18 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015715#comment-16015715
 ] 

Andrey commented on FLINK-6613:
---

Hi Dmytro,

The problem is that Flink creates too much objects :)

When I analysed heap dump, I saw that Flink created 2 ConsumerRecords objects, 
both reachable from GC root. (means they were not eligible for GC). These 2 
ConsumerRecords objects contained 1.5Gb of data. So GC setting won't help in 
this case. 

Even if I set 4Gb heap, Flink might read 2 ConsumerRecords objects 3Gb and 1Gb 
each. Which will cause OOM again.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-18 Thread Dmytro Shkvyra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015569#comment-16015569
 ] 

Dmytro Shkvyra commented on FLINK-6613:
---

Hi [~dernasherbrezon], first of all root cause of this issue is using 
ParallelGC. OOM is normal behavior for JVM with ParallelGC if application 
create too much objects (please explore ParallelGC algoritm). 
-XX:-UseGCOverheadLimit just hide problem with lack of memory.
{quote}
3) If you recommend G1, then default startup scripts should be changed.
{quote}
We don't need change startup scripts. You can {{export JVM_ARGS="$JVM_ARGS 
-XX:+UseG1GC"}}, you also can pass other JVM options (except memory size 
options)
JobManager and TaskManager use the same options from {{JVM_ARGS}}

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014647#comment-16014647
 ] 

Andrey commented on FLINK-6613:
---

Hi Dmytro.

There are several problems still exist:
1) I want to fail fast and kill jvm if it spends too much time doing gc. So we 
just can't remove "UseGCOverheadLimit" option.
2) Even if I remove this option, root cause will still exist. And the root 
cause is: kafka integration incorrectly deals with large messages. It reads as 
many messages as it could from kafka. And that will lead to OOM. GC settings 
irrelevant here, because these messages should not and will not be eligible for 
GC.
3) If you recommend G1, then default startup scripts should be changed.

Have you tried to reproduce issue?

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Dmytro Shkvyra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014357#comment-16014357
 ] 

Dmytro Shkvyra commented on FLINK-6613:
---

Hi [~dernasherbrezon], I have assumed that you say that you using ParallelGC.
Please see 
https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/parallel.html
 
{quote}
The parallel collector throws an OutOfMemoryError if too much time is being 
spent in garbage collection (GC): If more than 98% of the total time is spent 
in garbage collection and less than 2% of the heap is recovered, then an 
OutOfMemoryError is thrown. This feature is designed to prevent applications 
from running for an extended period of time while making little or no progress 
because the heap is too small. If necessary, this feature can be disabled by 
adding the option -XX:-UseGCOverheadLimit to the command line.
{quote}
and if 
{quote}
3) Send 3300 messages each 635Kb. So total size is ~2G
{quote}
ParallelGC cant collect all garbage in time.
BTW, we have two parallel CG algorithms 
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html 
and old one clean old generation also.
I think we can close this ticket, because it could be solved by using GC1 and 
out of scope FLINK 

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014307#comment-16014307
 ] 

Andrey commented on FLINK-6613:
---

-Xmx2g and ParallelGC. Default for openjdk.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Dmytro Shkvyra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014155#comment-16014155
 ] 

Dmytro Shkvyra commented on FLINK-6613:
---

[~dernasherbrezon] What JVM option and GC do you use?

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)