Hi Utkarsh, we are currently using Apache Beam 2.65.0.

Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections and
do not explicitly control bundling or parallelism at that stage. As a
result, the Kafka write likely inherits the parallelism of the preceding
transforms, as you pointed out.

Adding reshuffle before the Kafka write sounds like a good idea, will give
it a try. Thanks for the suggestion.

Best,
YongGang

On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh <[email protected]>
wrote:

> Which beam version are you using?
> Sent from my iPhone
>
> On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]>
> wrote:
>
> 
> What is the parallelism of the input here? The Kafka write probably
> inherits the parallelism of the previous transform - you can add a
> Reshuffle to explicitly reduce the parallelism before writing to Kafka.
>
> On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <
> [email protected]> wrote:
>
>> Hi Beam community,
>>
>> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that
>> write to *Confluent Cloud Kafka*, and we’re running into scaling issues
>> related to *Kafka producer connection explosion* when autoscaling kicks
>> in.
>>
>> After investigating with Confluent support, the root cause appears to be
>> that as Dataflow workers scale out, the total number of Kafka connections
>> grows very quickly, eventually exceeding cluster-side connection guidelines
>> and leading to increased producer-side latency and timeouts. Short-term
>> mitigations like capping max workers help, but clearly do not scale as we
>> add more Kafka topics.
>>
>> This led us to question whether our current usage
>> of KafkaIO.write() aligns with best practices for *producer lifecycle
>> and connection reuse* in Beam.
>>
>> *Current pattern*
>>
>> Conceptually, our write transform looks like this (simplified):
>>
>> @Override
>> public PDone expand(PCollection<V> input) {
>>   return input
>>       .apply("Convert to KV", WithKeys.of(partitionKeyFn))
>>       .apply(
>>           KafkaIO.<String, V>write()
>>               .withBootstrapServers(bootstrapServers)
>>               .withTopic(topicName)
>>               .withKeySerializer(StringSerializer.class)
>>               .withValueSerializer(valueSerializer)
>>               .withProducerConfigUpdates(
>>                   Map.of(
>>                       ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
>>                   )));
>>
>>      .withBootstrapServers(connectionConfig.bootstrapServer)
>>
>>      .withTopic(topicName)
>>      .withKeySerializer(StringSerializer.class)
>>      .withValueSerializer(valueSerializer));
>>
>> }
>>
>> Operationally, we observe that:
>>
>>
>>    -
>>
>>    Each Dataflow worker ends up creating *multiple Kafka producers*
>>    -
>>
>>    Each producer maintains *multiple TCP connections* to brokers
>>    -
>>
>>    When autoscaling occurs, total connections spike rapidly (tens of
>>    thousands)
>>    -
>>
>>    This correlates strongly with Kafka producer timeouts and lag alerts
>>
>> *Questions for the community*
>>
>> We’d really appreciate guidance on the following:
>>
>>
>>    1.
>>
>>    *What is the intended Kafka producer lifecycle in KafkaIO.write()?*
>>
>>    -
>>
>>       Is it expected that KafkaIO creates a producer per bundle / per
>>       transform instance?
>>       -
>>
>>       Are there guarantees (or non-guarantees) around producer reuse
>>       within a worker?
>>
>>    2.
>>
>>    *Is “one Kafka producer per worker” a supported or recommended
>>    pattern?*
>>
>>    Conceptually, this seems like the most scalable model given that:
>>
>>    -
>>
>>       All producers connect to the same Kafka cluster
>>       -
>>
>>       Topic count continues to grow over time
>>       -
>>
>>       Kafka producers are thread-safe and designed for multiplexing
>>
>>    3.
>>
>>    *Are there existing patterns or extension points to share producers
>>    across topics?*
>>
>>    For example:
>>
>>    -
>>
>>       Reusing a producer instance across
>>       multiple KafkaIO.write() transforms
>>       -
>>
>>       Custom sink implementations that manage producer lifecycle
>>       explicitly
>>       -
>>
>>       Runner-specific hooks (Dataflow) for long-lived shared resources
>>
>>    4.
>>
>>    *Has the Beam community seen similar issues at scale?*
>>
>>    If so:
>>
>>    -
>>
>>       How was it mitigated?
>>       -
>>
>>       Were there changes upstream in Beam, or purely application-level
>>       workarounds?
>>
>>
>>
>> We’re currently experimenting with a prototype that explicitly enforces *a
>> single producer per worker*, but before going further we’d like to
>> understand whether this aligns with Beam’s execution model and long-term
>> direction.
>>
>> Any insights, prior art, or pointers to discussions would be extremely
>> helpful.
>>
>> Thanks in advance, and cc’ing folks who may have relevant context.
>>
>> Best regards,
>>
>> --
>> YongGang Che
>> Sr. Software Engineer, Data Platform
>> [image: dialpad] <https://www.dialpad.com>
>>
>

-- 
YongGang Che
Sr. Software Engineer, Data Platform
(604) 305-4576
[image: dialpad] <https://www.dialpad.com>

Reply via email to