Hi Utkarsh, we are currently using Apache Beam 2.65.0. Hi Reuven, upstream of the Kafka write we Flatten multiple PCollections and do not explicitly control bundling or parallelism at that stage. As a result, the Kafka write likely inherits the parallelism of the preceding transforms, as you pointed out.
Adding reshuffle before the Kafka write sounds like a good idea, will give it a try. Thanks for the suggestion. Best, YongGang On Wed, Dec 17, 2025 at 11:22 AM Utkarsh Parekh <[email protected]> wrote: > Which beam version are you using? > Sent from my iPhone > > On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]> > wrote: > > > What is the parallelism of the input here? The Kafka write probably > inherits the parallelism of the previous transform - you can add a > Reshuffle to explicitly reduce the parallelism before writing to Kafka. > > On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user < > [email protected]> wrote: > >> Hi Beam community, >> >> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that >> write to *Confluent Cloud Kafka*, and we’re running into scaling issues >> related to *Kafka producer connection explosion* when autoscaling kicks >> in. >> >> After investigating with Confluent support, the root cause appears to be >> that as Dataflow workers scale out, the total number of Kafka connections >> grows very quickly, eventually exceeding cluster-side connection guidelines >> and leading to increased producer-side latency and timeouts. Short-term >> mitigations like capping max workers help, but clearly do not scale as we >> add more Kafka topics. >> >> This led us to question whether our current usage >> of KafkaIO.write() aligns with best practices for *producer lifecycle >> and connection reuse* in Beam. >> >> *Current pattern* >> >> Conceptually, our write transform looks like this (simplified): >> >> @Override >> public PDone expand(PCollection<V> input) { >> return input >> .apply("Convert to KV", WithKeys.of(partitionKeyFn)) >> .apply( >> KafkaIO.<String, V>write() >> .withBootstrapServers(bootstrapServers) >> .withTopic(topicName) >> .withKeySerializer(StringSerializer.class) >> .withValueSerializer(valueSerializer) >> .withProducerConfigUpdates( >> Map.of( >> ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd" >> ))); >> >> .withBootstrapServers(connectionConfig.bootstrapServer) >> >> .withTopic(topicName) >> .withKeySerializer(StringSerializer.class) >> .withValueSerializer(valueSerializer)); >> >> } >> >> Operationally, we observe that: >> >> >> - >> >> Each Dataflow worker ends up creating *multiple Kafka producers* >> - >> >> Each producer maintains *multiple TCP connections* to brokers >> - >> >> When autoscaling occurs, total connections spike rapidly (tens of >> thousands) >> - >> >> This correlates strongly with Kafka producer timeouts and lag alerts >> >> *Questions for the community* >> >> We’d really appreciate guidance on the following: >> >> >> 1. >> >> *What is the intended Kafka producer lifecycle in KafkaIO.write()?* >> >> - >> >> Is it expected that KafkaIO creates a producer per bundle / per >> transform instance? >> - >> >> Are there guarantees (or non-guarantees) around producer reuse >> within a worker? >> >> 2. >> >> *Is “one Kafka producer per worker” a supported or recommended >> pattern?* >> >> Conceptually, this seems like the most scalable model given that: >> >> - >> >> All producers connect to the same Kafka cluster >> - >> >> Topic count continues to grow over time >> - >> >> Kafka producers are thread-safe and designed for multiplexing >> >> 3. >> >> *Are there existing patterns or extension points to share producers >> across topics?* >> >> For example: >> >> - >> >> Reusing a producer instance across >> multiple KafkaIO.write() transforms >> - >> >> Custom sink implementations that manage producer lifecycle >> explicitly >> - >> >> Runner-specific hooks (Dataflow) for long-lived shared resources >> >> 4. >> >> *Has the Beam community seen similar issues at scale?* >> >> If so: >> >> - >> >> How was it mitigated? >> - >> >> Were there changes upstream in Beam, or purely application-level >> workarounds? >> >> >> >> We’re currently experimenting with a prototype that explicitly enforces *a >> single producer per worker*, but before going further we’d like to >> understand whether this aligns with Beam’s execution model and long-term >> direction. >> >> Any insights, prior art, or pointers to discussions would be extremely >> helpful. >> >> Thanks in advance, and cc’ing folks who may have relevant context. >> >> Best regards, >> >> -- >> YongGang Che >> Sr. Software Engineer, Data Platform >> [image: dialpad] <https://www.dialpad.com> >> > -- YongGang Che Sr. Software Engineer, Data Platform (604) 305-4576 [image: dialpad] <https://www.dialpad.com>
