Thanks, Reuven — appreciate the pointer. We’ll take a look at
Reshuffle.viaRandomKey().withNumBuckets(...) and experiment with it.

On Wed, Dec 17, 2025 at 4:41 PM Reuven Lax <[email protected]> wrote:

> You can take a look at Reshuffle.viaRandomKey.withNumBuckets
>
>
>
> On Wed, Dec 17, 2025 at 3:56 PM YongGang Che <[email protected]>
> wrote:
>
>> Hi Utkarsh, we are currently using Apache Beam 2.65.0.
>>
>> Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections
>> and do not explicitly control bundling or parallelism at that stage. As a
>> result, the Kafka write likely inherits the parallelism of the preceding
>> transforms, as you pointed out.
>>
>> Adding reshuffle before the Kafka write sounds like a good idea, will
>> give it a try. Thanks for the suggestion.
>>
>> Best,
>> YongGang
>>
>> On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh <
>> [email protected]> wrote:
>>
>>> Which beam version are you using?
>>> Sent from my iPhone
>>>
>>> On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]>
>>> wrote:
>>>
>>> 
>>> What is the parallelism of the input here? The Kafka write probably
>>> inherits the parallelism of the previous transform - you can add a
>>> Reshuffle to explicitly reduce the parallelism before writing to Kafka.
>>>
>>> On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <
>>> [email protected]> wrote:
>>>
>>>> Hi Beam community,
>>>>
>>>> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that
>>>> write to *Confluent Cloud Kafka*, and we’re running into scaling
>>>> issues related to *Kafka producer connection explosion* when
>>>> autoscaling kicks in.
>>>>
>>>> After investigating with Confluent support, the root cause appears to
>>>> be that as Dataflow workers scale out, the total number of Kafka
>>>> connections grows very quickly, eventually exceeding cluster-side
>>>> connection guidelines and leading to increased producer-side latency and
>>>> timeouts. Short-term mitigations like capping max workers help, but clearly
>>>> do not scale as we add more Kafka topics.
>>>>
>>>> This led us to question whether our current usage
>>>> of KafkaIO.write() aligns with best practices for *producer lifecycle
>>>> and connection reuse* in Beam.
>>>>
>>>> *Current pattern*
>>>>
>>>> Conceptually, our write transform looks like this (simplified):
>>>>
>>>> @Override
>>>> public PDone expand(PCollection<V> input) {
>>>>   return input
>>>>       .apply("Convert to KV", WithKeys.of(partitionKeyFn))
>>>>       .apply(
>>>>           KafkaIO.<String, V>write()
>>>>               .withBootstrapServers(bootstrapServers)
>>>>               .withTopic(topicName)
>>>>               .withKeySerializer(StringSerializer.class)
>>>>               .withValueSerializer(valueSerializer)
>>>>               .withProducerConfigUpdates(
>>>>                   Map.of(
>>>>                       ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
>>>>                   )));
>>>>
>>>>      .withBootstrapServers(connectionConfig.bootstrapServer)
>>>>
>>>>      .withTopic(topicName)
>>>>      .withKeySerializer(StringSerializer.class)
>>>>      .withValueSerializer(valueSerializer));
>>>>
>>>> }
>>>>
>>>> Operationally, we observe that:
>>>>
>>>>
>>>>    -
>>>>
>>>>    Each Dataflow worker ends up creating *multiple Kafka producers*
>>>>    -
>>>>
>>>>    Each producer maintains *multiple TCP connections* to brokers
>>>>    -
>>>>
>>>>    When autoscaling occurs, total connections spike rapidly (tens of
>>>>    thousands)
>>>>    -
>>>>
>>>>    This correlates strongly with Kafka producer timeouts and lag alerts
>>>>
>>>> *Questions for the community*
>>>>
>>>> We’d really appreciate guidance on the following:
>>>>
>>>>
>>>>    1.
>>>>
>>>>    *What is the intended Kafka producer lifecycle in KafkaIO.write()?*
>>>>
>>>>    -
>>>>
>>>>       Is it expected that KafkaIO creates a producer per bundle / per
>>>>       transform instance?
>>>>       -
>>>>
>>>>       Are there guarantees (or non-guarantees) around producer reuse
>>>>       within a worker?
>>>>
>>>>    2.
>>>>
>>>>    *Is “one Kafka producer per worker” a supported or recommended
>>>>    pattern?*
>>>>
>>>>    Conceptually, this seems like the most scalable model given that:
>>>>
>>>>    -
>>>>
>>>>       All producers connect to the same Kafka cluster
>>>>       -
>>>>
>>>>       Topic count continues to grow over time
>>>>       -
>>>>
>>>>       Kafka producers are thread-safe and designed for multiplexing
>>>>
>>>>    3.
>>>>
>>>>    *Are there existing patterns or extension points to share producers
>>>>    across topics?*
>>>>
>>>>    For example:
>>>>
>>>>    -
>>>>
>>>>       Reusing a producer instance across
>>>>       multiple KafkaIO.write() transforms
>>>>       -
>>>>
>>>>       Custom sink implementations that manage producer lifecycle
>>>>       explicitly
>>>>       -
>>>>
>>>>       Runner-specific hooks (Dataflow) for long-lived shared resources
>>>>
>>>>    4.
>>>>
>>>>    *Has the Beam community seen similar issues at scale?*
>>>>
>>>>    If so:
>>>>
>>>>    -
>>>>
>>>>       How was it mitigated?
>>>>       -
>>>>
>>>>       Were there changes upstream in Beam, or purely application-level
>>>>       workarounds?
>>>>
>>>>
>>>>
>>>> We’re currently experimenting with a prototype that explicitly enforces *a
>>>> single producer per worker*, but before going further we’d like to
>>>> understand whether this aligns with Beam’s execution model and long-term
>>>> direction.
>>>>
>>>> Any insights, prior art, or pointers to discussions would be extremely
>>>> helpful.
>>>>
>>>> Thanks in advance, and cc’ing folks who may have relevant context.
>>>>
>>>> Best regards,
>>>>
>>>> --
>>>> YongGang Che
>>>> Sr. Software Engineer, Data Platform
>>>> [image: dialpad] <https://www.dialpad.com>
>>>>
>>>
>>
>> --
>> YongGang Che
>> Sr. Software Engineer, Data Platform
>> (604) 305-4576
>> [image: dialpad] <https://www.dialpad.com>
>>
>

-- 
YongGang Che
Sr. Software Engineer, Data Platform
(604) 305-4576
[image: dialpad] <https://www.dialpad.com>

Reply via email to