Thanks, Reuven — appreciate the pointer. We’ll take a look at Reshuffle.viaRandomKey().withNumBuckets(...) and experiment with it.
On Wed, Dec 17, 2025 at 4:41 PM Reuven Lax <[email protected]> wrote: > You can take a look at Reshuffle.viaRandomKey.withNumBuckets > > > > On Wed, Dec 17, 2025 at 3:56 PM YongGang Che <[email protected]> > wrote: > >> Hi Utkarsh, we are currently using Apache Beam 2.65.0. >> >> Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections >> and do not explicitly control bundling or parallelism at that stage. As a >> result, the Kafka write likely inherits the parallelism of the preceding >> transforms, as you pointed out. >> >> Adding reshuffle before the Kafka write sounds like a good idea, will >> give it a try. Thanks for the suggestion. >> >> Best, >> YongGang >> >> On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh < >> [email protected]> wrote: >> >>> Which beam version are you using? >>> Sent from my iPhone >>> >>> On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]> >>> wrote: >>> >>> >>> What is the parallelism of the input here? The Kafka write probably >>> inherits the parallelism of the previous transform - you can add a >>> Reshuffle to explicitly reduce the parallelism before writing to Kafka. >>> >>> On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user < >>> [email protected]> wrote: >>> >>>> Hi Beam community, >>>> >>>> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that >>>> write to *Confluent Cloud Kafka*, and we’re running into scaling >>>> issues related to *Kafka producer connection explosion* when >>>> autoscaling kicks in. >>>> >>>> After investigating with Confluent support, the root cause appears to >>>> be that as Dataflow workers scale out, the total number of Kafka >>>> connections grows very quickly, eventually exceeding cluster-side >>>> connection guidelines and leading to increased producer-side latency and >>>> timeouts. Short-term mitigations like capping max workers help, but clearly >>>> do not scale as we add more Kafka topics. >>>> >>>> This led us to question whether our current usage >>>> of KafkaIO.write() aligns with best practices for *producer lifecycle >>>> and connection reuse* in Beam. >>>> >>>> *Current pattern* >>>> >>>> Conceptually, our write transform looks like this (simplified): >>>> >>>> @Override >>>> public PDone expand(PCollection<V> input) { >>>> return input >>>> .apply("Convert to KV", WithKeys.of(partitionKeyFn)) >>>> .apply( >>>> KafkaIO.<String, V>write() >>>> .withBootstrapServers(bootstrapServers) >>>> .withTopic(topicName) >>>> .withKeySerializer(StringSerializer.class) >>>> .withValueSerializer(valueSerializer) >>>> .withProducerConfigUpdates( >>>> Map.of( >>>> ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd" >>>> ))); >>>> >>>> .withBootstrapServers(connectionConfig.bootstrapServer) >>>> >>>> .withTopic(topicName) >>>> .withKeySerializer(StringSerializer.class) >>>> .withValueSerializer(valueSerializer)); >>>> >>>> } >>>> >>>> Operationally, we observe that: >>>> >>>> >>>> - >>>> >>>> Each Dataflow worker ends up creating *multiple Kafka producers* >>>> - >>>> >>>> Each producer maintains *multiple TCP connections* to brokers >>>> - >>>> >>>> When autoscaling occurs, total connections spike rapidly (tens of >>>> thousands) >>>> - >>>> >>>> This correlates strongly with Kafka producer timeouts and lag alerts >>>> >>>> *Questions for the community* >>>> >>>> We’d really appreciate guidance on the following: >>>> >>>> >>>> 1. >>>> >>>> *What is the intended Kafka producer lifecycle in KafkaIO.write()?* >>>> >>>> - >>>> >>>> Is it expected that KafkaIO creates a producer per bundle / per >>>> transform instance? >>>> - >>>> >>>> Are there guarantees (or non-guarantees) around producer reuse >>>> within a worker? >>>> >>>> 2. >>>> >>>> *Is “one Kafka producer per worker” a supported or recommended >>>> pattern?* >>>> >>>> Conceptually, this seems like the most scalable model given that: >>>> >>>> - >>>> >>>> All producers connect to the same Kafka cluster >>>> - >>>> >>>> Topic count continues to grow over time >>>> - >>>> >>>> Kafka producers are thread-safe and designed for multiplexing >>>> >>>> 3. >>>> >>>> *Are there existing patterns or extension points to share producers >>>> across topics?* >>>> >>>> For example: >>>> >>>> - >>>> >>>> Reusing a producer instance across >>>> multiple KafkaIO.write() transforms >>>> - >>>> >>>> Custom sink implementations that manage producer lifecycle >>>> explicitly >>>> - >>>> >>>> Runner-specific hooks (Dataflow) for long-lived shared resources >>>> >>>> 4. >>>> >>>> *Has the Beam community seen similar issues at scale?* >>>> >>>> If so: >>>> >>>> - >>>> >>>> How was it mitigated? >>>> - >>>> >>>> Were there changes upstream in Beam, or purely application-level >>>> workarounds? >>>> >>>> >>>> >>>> We’re currently experimenting with a prototype that explicitly enforces *a >>>> single producer per worker*, but before going further we’d like to >>>> understand whether this aligns with Beam’s execution model and long-term >>>> direction. >>>> >>>> Any insights, prior art, or pointers to discussions would be extremely >>>> helpful. >>>> >>>> Thanks in advance, and cc’ing folks who may have relevant context. >>>> >>>> Best regards, >>>> >>>> -- >>>> YongGang Che >>>> Sr. Software Engineer, Data Platform >>>> [image: dialpad] <https://www.dialpad.com> >>>> >>> >> >> -- >> YongGang Che >> Sr. Software Engineer, Data Platform >> (604) 305-4576 >> [image: dialpad] <https://www.dialpad.com> >> > -- YongGang Che Sr. Software Engineer, Data Platform (604) 305-4576 [image: dialpad] <https://www.dialpad.com>
