[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-23 Thread Nicolas Perrin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839979#comment-17839979
 ] 

Nicolas Perrin commented on FLINK-35210:


I see your point.


Unfortunately, the issue only appeared in production settings in the context of 
a 1.15 to 1.16 migration and we only could guess that this migration implied 
more memory load on the instances, leading to disk spilling and eventually 
unresponsive instances. We tested tweaking all the parameters that changed with 
the migration so the behaviour would be the same as it used to be but nothing 
worked apart reducing the local parallelism for the kafka source operators and 
the global parallelism to match the number of instances.

Just to be sure I understand it well: when the parallelism of a kafka source is 
bigger that the number of Kafka partitions, the reading task from no partition 
will:
 * be part of a task slot in case of slot sharing and therefore won't waste 
memory as this task slot will be used by other tasks,
 * have a dedicated task slot in case of non slot sharing and therefore will 
waste the dedicated memory.

However, in the first case the memory footprint won't be the same across nodes 
as the nodes that contain a task slot reading from an existing partition will 
require more memory than the others.

Is my understanding correct?

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839816#comment-17839816
 ] 

Martijn Visser commented on FLINK-35210:


There isn't enough information about used/tested versions of Flink, but I think 
the actual solution should be fixing bugs (if there are any) instead of finding 
workarounds like the one proposed in the ticket. It sounds trivial from the 
start, but the moment you have to take things like new partitions being added 
on a source that's already in use, this becomes less of an easy fix. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
 already showed some considerations for those edge cases as well.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Nicolas Perrin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839756#comment-17839756
 ] 

Nicolas Perrin commented on FLINK-35210:


That's indeed how I proceeded in our case.

 I'm sorry, my choice of words was a bit confusing.

We are running a Flink job on EMR and we had a lot of issues where some task 
instances would be randomly killed. From what we understood, one of the reason 
was that we used a Flink parallelism for the application equals to a multiple 
of the number of instances. So some of the task instances could end up reading 
one or more partitions while some others would read nothing while having the 
dedicated resources. This behaviour was amplified by the fact that we've got 
several kafka sources.
It appeared to us that this was a source of instability, so we ended up forcing 
the parallelism of the source operator to the number of kafka partitions.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839739#comment-17839739
 ] 

Oleksandr Nitavskyi commented on FLINK-35210:
-

Thanks [~npfp] for suggestion. I believe what you proposed is often resolve 
with some wrapper around KafkaSource, which could be a layer of indirection to 
do a lot of things, e.g. parallelism config.

Meanwhile could you please elaborate how could bad parallelism lead to the Idle 
tasks? Do you mean the case where Source parallelism is lower than the amount 
of partitions and thus you have Source which consumes nothing and thus you have 
no watermark advancement unless 
[Idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
 is not configured.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)