What is the parallelism of the input here? The Kafka write probably inherits the parallelism of the previous transform - you can add a Reshuffle to explicitly reduce the parallelism before writing to Kafka.
On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <[email protected]> wrote: > Hi Beam community, > > We’re running Apache Beam pipelines on *Google Cloud Dataflow* that write > to *Confluent Cloud Kafka*, and we’re running into scaling issues related > to *Kafka producer connection explosion* when autoscaling kicks in. > > After investigating with Confluent support, the root cause appears to be > that as Dataflow workers scale out, the total number of Kafka connections > grows very quickly, eventually exceeding cluster-side connection guidelines > and leading to increased producer-side latency and timeouts. Short-term > mitigations like capping max workers help, but clearly do not scale as we > add more Kafka topics. > > This led us to question whether our current usage > of KafkaIO.write() aligns with best practices for *producer lifecycle and > connection reuse* in Beam. > > *Current pattern* > > Conceptually, our write transform looks like this (simplified): > > @Override > public PDone expand(PCollection<V> input) { > return input > .apply("Convert to KV", WithKeys.of(partitionKeyFn)) > .apply( > KafkaIO.<String, V>write() > .withBootstrapServers(bootstrapServers) > .withTopic(topicName) > .withKeySerializer(StringSerializer.class) > .withValueSerializer(valueSerializer) > .withProducerConfigUpdates( > Map.of( > ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd" > ))); > > .withBootstrapServers(connectionConfig.bootstrapServer) > > .withTopic(topicName) > .withKeySerializer(StringSerializer.class) > .withValueSerializer(valueSerializer)); > > } > > Operationally, we observe that: > > > - > > Each Dataflow worker ends up creating *multiple Kafka producers* > - > > Each producer maintains *multiple TCP connections* to brokers > - > > When autoscaling occurs, total connections spike rapidly (tens of > thousands) > - > > This correlates strongly with Kafka producer timeouts and lag alerts > > *Questions for the community* > > We’d really appreciate guidance on the following: > > > 1. > > *What is the intended Kafka producer lifecycle in KafkaIO.write()?* > > - > > Is it expected that KafkaIO creates a producer per bundle / per > transform instance? > - > > Are there guarantees (or non-guarantees) around producer reuse > within a worker? > > 2. > > *Is “one Kafka producer per worker” a supported or recommended > pattern?* > > Conceptually, this seems like the most scalable model given that: > > - > > All producers connect to the same Kafka cluster > - > > Topic count continues to grow over time > - > > Kafka producers are thread-safe and designed for multiplexing > > 3. > > *Are there existing patterns or extension points to share producers > across topics?* > > For example: > > - > > Reusing a producer instance across > multiple KafkaIO.write() transforms > - > > Custom sink implementations that manage producer lifecycle > explicitly > - > > Runner-specific hooks (Dataflow) for long-lived shared resources > > 4. > > *Has the Beam community seen similar issues at scale?* > > If so: > > - > > How was it mitigated? > - > > Were there changes upstream in Beam, or purely application-level > workarounds? > > > > We’re currently experimenting with a prototype that explicitly enforces *a > single producer per worker*, but before going further we’d like to > understand whether this aligns with Beam’s execution model and long-term > direction. > > Any insights, prior art, or pointers to discussions would be extremely > helpful. > > Thanks in advance, and cc’ing folks who may have relevant context. > > Best regards, > > -- > YongGang Che > Sr. Software Engineer, Data Platform > [image: dialpad] <https://www.dialpad.com> >
