[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839979#comment-17839979 ] Nicolas Perrin commented on FLINK-35210: I see your point. Unfortunately, the issue only appeared in production settings in the context of a 1.15 to 1.16 migration and we only could guess that this migration implied more memory load on the instances, leading to disk spilling and eventually unresponsive instances. We tested tweaking all the parameters that changed with the migration so the behaviour would be the same as it used to be but nothing worked apart reducing the local parallelism for the kafka source operators and the global parallelism to match the number of instances. Just to be sure I understand it well: when the parallelism of a kafka source is bigger that the number of Kafka partitions, the reading task from no partition will: * be part of a task slot in case of slot sharing and therefore won't waste memory as this task slot will be used by other tasks, * have a dedicated task slot in case of non slot sharing and therefore will waste the dedicated memory. However, in the first case the memory footprint won't be the same across nodes as the nodes that contain a task slot reading from an existing partition will require more memory than the others. Is my understanding correct? > Give the option to set automatically the parallelism of the KafkaSource to > the number of kafka partitions > - > > Key: FLINK-35210 > URL: https://issues.apache.org/jira/browse/FLINK-35210 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Nicolas Perrin >Priority: Minor > > Currently the setting of the `KafkaSource` Flink's operator parallelism needs > to be manually chosen which can leads to highly skewed tasks if the developer > doesn't do this job. > To avoid this issue, I propose to: > - retrieve dynamically the number of partitions of the topic using > `KafkaConsumer. > partitionsFor(topic).size()`, > - set the parallelism of the stream built from the source based on this value. > This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839816#comment-17839816 ] Martijn Visser commented on FLINK-35210: There isn't enough information about used/tested versions of Flink, but I think the actual solution should be fixing bugs (if there are any) instead of finding workarounds like the one proposed in the ticket. It sounds trivial from the start, but the moment you have to take things like new partitions being added on a source that's already in use, this becomes less of an easy fix. https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source already showed some considerations for those edge cases as well. > Give the option to set automatically the parallelism of the KafkaSource to > the number of kafka partitions > - > > Key: FLINK-35210 > URL: https://issues.apache.org/jira/browse/FLINK-35210 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Nicolas Perrin >Priority: Minor > > Currently the setting of the `KafkaSource` Flink's operator parallelism needs > to be manually chosen which can leads to highly skewed tasks if the developer > doesn't do this job. > To avoid this issue, I propose to: > - retrieve dynamically the number of partitions of the topic using > `KafkaConsumer. > partitionsFor(topic).size()`, > - set the parallelism of the stream built from the source based on this value. > This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839756#comment-17839756 ] Nicolas Perrin commented on FLINK-35210: That's indeed how I proceeded in our case. I'm sorry, my choice of words was a bit confusing. We are running a Flink job on EMR and we had a lot of issues where some task instances would be randomly killed. From what we understood, one of the reason was that we used a Flink parallelism for the application equals to a multiple of the number of instances. So some of the task instances could end up reading one or more partitions while some others would read nothing while having the dedicated resources. This behaviour was amplified by the fact that we've got several kafka sources. It appeared to us that this was a source of instability, so we ended up forcing the parallelism of the source operator to the number of kafka partitions. > Give the option to set automatically the parallelism of the KafkaSource to > the number of kafka partitions > - > > Key: FLINK-35210 > URL: https://issues.apache.org/jira/browse/FLINK-35210 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Nicolas Perrin >Priority: Minor > > Currently the setting of the `KafkaSource` Flink's operator parallelism needs > to be manually chosen which can leads to highly skewed tasks if the developer > doesn't do this job. > To avoid this issue, I propose to: > - retrieve dynamically the number of partitions of the topic using > `KafkaConsumer. > partitionsFor(topic).size()`, > - set the parallelism of the stream built from the source based on this value. > This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839739#comment-17839739 ] Oleksandr Nitavskyi commented on FLINK-35210: - Thanks [~npfp] for suggestion. I believe what you proposed is often resolve with some wrapper around KafkaSource, which could be a layer of indirection to do a lot of things, e.g. parallelism config. Meanwhile could you please elaborate how could bad parallelism lead to the Idle tasks? Do you mean the case where Source parallelism is lower than the amount of partitions and thus you have Source which consumes nothing and thus you have no watermark advancement unless [Idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources] is not configured. > Give the option to set automatically the parallelism of the KafkaSource to > the number of kafka partitions > - > > Key: FLINK-35210 > URL: https://issues.apache.org/jira/browse/FLINK-35210 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Nicolas Perrin >Priority: Minor > > Currently the setting of the `KafkaSource` Flink's operator parallelism needs > to be manually chosen which can leads to highly skewed tasks if the developer > doesn't do this job. > To avoid this issue, I propose to: > - retrieve dynamically the number of partitions of the topic using > `KafkaConsumer. > partitionsFor(topic).size()`, > - set the parallelism of the stream built from the source based on this value. > This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)