You can take a look at Reshuffle.viaRandomKey.withNumBuckets


On Wed, Dec 17, 2025 at 3:56 PM YongGang Che <[email protected]>
wrote:

> Hi Utkarsh, we are currently using Apache Beam 2.65.0.
>
> Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections
> and do not explicitly control bundling or parallelism at that stage. As a
> result, the Kafka write likely inherits the parallelism of the preceding
> transforms, as you pointed out.
>
> Adding reshuffle before the Kafka write sounds like a good idea, will give
> it a try. Thanks for the suggestion.
>
> Best,
> YongGang
>
> On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh <
> [email protected]> wrote:
>
>> Which beam version are you using?
>> Sent from my iPhone
>>
>> On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]>
>> wrote:
>>
>> 
>> What is the parallelism of the input here? The Kafka write probably
>> inherits the parallelism of the previous transform - you can add a
>> Reshuffle to explicitly reduce the parallelism before writing to Kafka.
>>
>> On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <
>> [email protected]> wrote:
>>
>>> Hi Beam community,
>>>
>>> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that
>>> write to *Confluent Cloud Kafka*, and we’re running into scaling issues
>>> related to *Kafka producer connection explosion* when autoscaling kicks
>>> in.
>>>
>>> After investigating with Confluent support, the root cause appears to be
>>> that as Dataflow workers scale out, the total number of Kafka connections
>>> grows very quickly, eventually exceeding cluster-side connection guidelines
>>> and leading to increased producer-side latency and timeouts. Short-term
>>> mitigations like capping max workers help, but clearly do not scale as we
>>> add more Kafka topics.
>>>
>>> This led us to question whether our current usage
>>> of KafkaIO.write() aligns with best practices for *producer lifecycle
>>> and connection reuse* in Beam.
>>>
>>> *Current pattern*
>>>
>>> Conceptually, our write transform looks like this (simplified):
>>>
>>> @Override
>>> public PDone expand(PCollection<V> input) {
>>>   return input
>>>       .apply("Convert to KV", WithKeys.of(partitionKeyFn))
>>>       .apply(
>>>           KafkaIO.<String, V>write()
>>>               .withBootstrapServers(bootstrapServers)
>>>               .withTopic(topicName)
>>>               .withKeySerializer(StringSerializer.class)
>>>               .withValueSerializer(valueSerializer)
>>>               .withProducerConfigUpdates(
>>>                   Map.of(
>>>                       ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
>>>                   )));
>>>
>>>      .withBootstrapServers(connectionConfig.bootstrapServer)
>>>
>>>      .withTopic(topicName)
>>>      .withKeySerializer(StringSerializer.class)
>>>      .withValueSerializer(valueSerializer));
>>>
>>> }
>>>
>>> Operationally, we observe that:
>>>
>>>
>>>    -
>>>
>>>    Each Dataflow worker ends up creating *multiple Kafka producers*
>>>    -
>>>
>>>    Each producer maintains *multiple TCP connections* to brokers
>>>    -
>>>
>>>    When autoscaling occurs, total connections spike rapidly (tens of
>>>    thousands)
>>>    -
>>>
>>>    This correlates strongly with Kafka producer timeouts and lag alerts
>>>
>>> *Questions for the community*
>>>
>>> We’d really appreciate guidance on the following:
>>>
>>>
>>>    1.
>>>
>>>    *What is the intended Kafka producer lifecycle in KafkaIO.write()?*
>>>
>>>    -
>>>
>>>       Is it expected that KafkaIO creates a producer per bundle / per
>>>       transform instance?
>>>       -
>>>
>>>       Are there guarantees (or non-guarantees) around producer reuse
>>>       within a worker?
>>>
>>>    2.
>>>
>>>    *Is “one Kafka producer per worker” a supported or recommended
>>>    pattern?*
>>>
>>>    Conceptually, this seems like the most scalable model given that:
>>>
>>>    -
>>>
>>>       All producers connect to the same Kafka cluster
>>>       -
>>>
>>>       Topic count continues to grow over time
>>>       -
>>>
>>>       Kafka producers are thread-safe and designed for multiplexing
>>>
>>>    3.
>>>
>>>    *Are there existing patterns or extension points to share producers
>>>    across topics?*
>>>
>>>    For example:
>>>
>>>    -
>>>
>>>       Reusing a producer instance across
>>>       multiple KafkaIO.write() transforms
>>>       -
>>>
>>>       Custom sink implementations that manage producer lifecycle
>>>       explicitly
>>>       -
>>>
>>>       Runner-specific hooks (Dataflow) for long-lived shared resources
>>>
>>>    4.
>>>
>>>    *Has the Beam community seen similar issues at scale?*
>>>
>>>    If so:
>>>
>>>    -
>>>
>>>       How was it mitigated?
>>>       -
>>>
>>>       Were there changes upstream in Beam, or purely application-level
>>>       workarounds?
>>>
>>>
>>>
>>> We’re currently experimenting with a prototype that explicitly enforces *a
>>> single producer per worker*, but before going further we’d like to
>>> understand whether this aligns with Beam’s execution model and long-term
>>> direction.
>>>
>>> Any insights, prior art, or pointers to discussions would be extremely
>>> helpful.
>>>
>>> Thanks in advance, and cc’ing folks who may have relevant context.
>>>
>>> Best regards,
>>>
>>> --
>>> YongGang Che
>>> Sr. Software Engineer, Data Platform
>>> [image: dialpad] <https://www.dialpad.com>
>>>
>>
>
> --
> YongGang Che
> Sr. Software Engineer, Data Platform
> (604) 305-4576
> [image: dialpad] <https://www.dialpad.com>
>

Reply via email to