You can take a look at Reshuffle.viaRandomKey.withNumBuckets
On Wed, Dec 17, 2025 at 3:56 PM YongGang Che <[email protected]> wrote: > Hi Utkarsh, we are currently using Apache Beam 2.65.0. > > Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections > and do not explicitly control bundling or parallelism at that stage. As a > result, the Kafka write likely inherits the parallelism of the preceding > transforms, as you pointed out. > > Adding reshuffle before the Kafka write sounds like a good idea, will give > it a try. Thanks for the suggestion. > > Best, > YongGang > > On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh < > [email protected]> wrote: > >> Which beam version are you using? >> Sent from my iPhone >> >> On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]> >> wrote: >> >> >> What is the parallelism of the input here? The Kafka write probably >> inherits the parallelism of the previous transform - you can add a >> Reshuffle to explicitly reduce the parallelism before writing to Kafka. >> >> On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user < >> [email protected]> wrote: >> >>> Hi Beam community, >>> >>> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that >>> write to *Confluent Cloud Kafka*, and we’re running into scaling issues >>> related to *Kafka producer connection explosion* when autoscaling kicks >>> in. >>> >>> After investigating with Confluent support, the root cause appears to be >>> that as Dataflow workers scale out, the total number of Kafka connections >>> grows very quickly, eventually exceeding cluster-side connection guidelines >>> and leading to increased producer-side latency and timeouts. Short-term >>> mitigations like capping max workers help, but clearly do not scale as we >>> add more Kafka topics. >>> >>> This led us to question whether our current usage >>> of KafkaIO.write() aligns with best practices for *producer lifecycle >>> and connection reuse* in Beam. >>> >>> *Current pattern* >>> >>> Conceptually, our write transform looks like this (simplified): >>> >>> @Override >>> public PDone expand(PCollection<V> input) { >>> return input >>> .apply("Convert to KV", WithKeys.of(partitionKeyFn)) >>> .apply( >>> KafkaIO.<String, V>write() >>> .withBootstrapServers(bootstrapServers) >>> .withTopic(topicName) >>> .withKeySerializer(StringSerializer.class) >>> .withValueSerializer(valueSerializer) >>> .withProducerConfigUpdates( >>> Map.of( >>> ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd" >>> ))); >>> >>> .withBootstrapServers(connectionConfig.bootstrapServer) >>> >>> .withTopic(topicName) >>> .withKeySerializer(StringSerializer.class) >>> .withValueSerializer(valueSerializer)); >>> >>> } >>> >>> Operationally, we observe that: >>> >>> >>> - >>> >>> Each Dataflow worker ends up creating *multiple Kafka producers* >>> - >>> >>> Each producer maintains *multiple TCP connections* to brokers >>> - >>> >>> When autoscaling occurs, total connections spike rapidly (tens of >>> thousands) >>> - >>> >>> This correlates strongly with Kafka producer timeouts and lag alerts >>> >>> *Questions for the community* >>> >>> We’d really appreciate guidance on the following: >>> >>> >>> 1. >>> >>> *What is the intended Kafka producer lifecycle in KafkaIO.write()?* >>> >>> - >>> >>> Is it expected that KafkaIO creates a producer per bundle / per >>> transform instance? >>> - >>> >>> Are there guarantees (or non-guarantees) around producer reuse >>> within a worker? >>> >>> 2. >>> >>> *Is “one Kafka producer per worker” a supported or recommended >>> pattern?* >>> >>> Conceptually, this seems like the most scalable model given that: >>> >>> - >>> >>> All producers connect to the same Kafka cluster >>> - >>> >>> Topic count continues to grow over time >>> - >>> >>> Kafka producers are thread-safe and designed for multiplexing >>> >>> 3. >>> >>> *Are there existing patterns or extension points to share producers >>> across topics?* >>> >>> For example: >>> >>> - >>> >>> Reusing a producer instance across >>> multiple KafkaIO.write() transforms >>> - >>> >>> Custom sink implementations that manage producer lifecycle >>> explicitly >>> - >>> >>> Runner-specific hooks (Dataflow) for long-lived shared resources >>> >>> 4. >>> >>> *Has the Beam community seen similar issues at scale?* >>> >>> If so: >>> >>> - >>> >>> How was it mitigated? >>> - >>> >>> Were there changes upstream in Beam, or purely application-level >>> workarounds? >>> >>> >>> >>> We’re currently experimenting with a prototype that explicitly enforces *a >>> single producer per worker*, but before going further we’d like to >>> understand whether this aligns with Beam’s execution model and long-term >>> direction. >>> >>> Any insights, prior art, or pointers to discussions would be extremely >>> helpful. >>> >>> Thanks in advance, and cc’ing folks who may have relevant context. >>> >>> Best regards, >>> >>> -- >>> YongGang Che >>> Sr. Software Engineer, Data Platform >>> [image: dialpad] <https://www.dialpad.com> >>> >> > > -- > YongGang Che > Sr. Software Engineer, Data Platform > (604) 305-4576 > [image: dialpad] <https://www.dialpad.com> >
