[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022593#comment-16022593 ] Andrey commented on FLINK-6613: --- Hey! Looks like "fetch.max.bytes" option is what we need. I will try it. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai commented on FLINK-6613: +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021747#comment-16021747 ] Robert Metzger commented on FLINK-6613: --- There are some options in the Kafka consumer (that you can just pass using the configuration properties of Flink's consumer) that influence the fetch behavior. For example {{max.poll.records}} limits the number of records per poll, {{fetch.max.bytes}} the bytes per poll. I would try to play around with these settings to see if it solves the problem. Flink itself won't buffer any records directly on the heap. The only thing that buffers in Flink is the network stack, but that is strictly bounded by a configuration parameter. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015715#comment-16015715 ] Andrey commented on FLINK-6613: --- Hi Dmytro, The problem is that Flink creates too much objects :) When I analysed heap dump, I saw that Flink created 2 ConsumerRecords objects, both reachable from GC root. (means they were not eligible for GC). These 2 ConsumerRecords objects contained 1.5Gb of data. So GC setting won't help in this case. Even if I set 4Gb heap, Flink might read 2 ConsumerRecords objects 3Gb and 1Gb each. Which will cause OOM again. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015569#comment-16015569 ] Dmytro Shkvyra commented on FLINK-6613: --- Hi [~dernasherbrezon], first of all root cause of this issue is using ParallelGC. OOM is normal behavior for JVM with ParallelGC if application create too much objects (please explore ParallelGC algoritm). -XX:-UseGCOverheadLimit just hide problem with lack of memory. {quote} 3) If you recommend G1, then default startup scripts should be changed. {quote} We don't need change startup scripts. You can {{export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"}}, you also can pass other JVM options (except memory size options) JobManager and TaskManager use the same options from {{JVM_ARGS}} > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014647#comment-16014647 ] Andrey commented on FLINK-6613: --- Hi Dmytro. There are several problems still exist: 1) I want to fail fast and kill jvm if it spends too much time doing gc. So we just can't remove "UseGCOverheadLimit" option. 2) Even if I remove this option, root cause will still exist. And the root cause is: kafka integration incorrectly deals with large messages. It reads as many messages as it could from kafka. And that will lead to OOM. GC settings irrelevant here, because these messages should not and will not be eligible for GC. 3) If you recommend G1, then default startup scripts should be changed. Have you tried to reproduce issue? > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014357#comment-16014357 ] Dmytro Shkvyra commented on FLINK-6613: --- Hi [~dernasherbrezon], I have assumed that you say that you using ParallelGC. Please see https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/parallel.html {quote} The parallel collector throws an OutOfMemoryError if too much time is being spent in garbage collection (GC): If more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, then an OutOfMemoryError is thrown. This feature is designed to prevent applications from running for an extended period of time while making little or no progress because the heap is too small. If necessary, this feature can be disabled by adding the option -XX:-UseGCOverheadLimit to the command line. {quote} and if {quote} 3) Send 3300 messages each 635Kb. So total size is ~2G {quote} ParallelGC cant collect all garbage in time. BTW, we have two parallel CG algorithms http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html and old one clean old generation also. I think we can close this ticket, because it could be solved by using GC1 and out of scope FLINK > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014307#comment-16014307 ] Andrey commented on FLINK-6613: --- -Xmx2g and ParallelGC. Default for openjdk. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014155#comment-16014155 ] Dmytro Shkvyra commented on FLINK-6613: --- [~dernasherbrezon] What JVM option and GC do you use? > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)