We’re running Apache Beam pipelines on Google Cloud Dataflow that write to Confluent Cloud Kafka, and we’re running into scaling issues related to Kafka producer connection explosion when autoscaling kicks in.
After investigating with Confluent support, the root cause appears to be that as Dataflow workers scale out, the total number of Kafka connections grows very quickly, eventually exceeding cluster-side connection guidelines and leading to increased producer-side latency and timeouts. Short-term mitigations like capping max workers help, but clearly do not scale as we add more Kafka topics.
This led us to question whether our current usage of KafkaIO.write() aligns with best practices for producer lifecycle and connection reuse in Beam.
Current pattern
Conceptually, our write transform looks like this (simplified):
@Override
public PDone expand(PCollection<V> input) {
return input
.apply("Convert to KV", WithKeys.of(partitionKeyFn))
.apply(
KafkaIO.<String, V>write()
.withBootstrapServers(bootstrapServers)
.withTopic(topicName)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer)
.withProducerConfigUpdates(
Map.of(
ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
)));
.withBootstrapServers(connectionConfig.bootstrapServer)
.withTopic(topicName)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer));
}
Operationally, we observe that:
Each Dataflow worker ends up creating multiple Kafka producers
Each producer maintains multiple TCP connections to brokers
When autoscaling occurs, total connections spike rapidly (tens of thousands)
This correlates strongly with Kafka producer timeouts and lag alerts
Questions for the community
We’d really appreciate guidance on the following:
What is the intended Kafka producer lifecycle in KafkaIO.write()?
Is “one Kafka producer per worker” a supported or recommended pattern?
Conceptually, this seems like the most scalable model given that:
All producers connect to the same Kafka cluster
Topic count continues to grow over time
Kafka producers are thread-safe and designed for multiplexing
Are there existing patterns or extension points to share producers across topics?
For example:
Reusing a producer instance across multiple KafkaIO.write() transforms
Custom sink implementations that manage producer lifecycle explicitly
Runner-specific hooks (Dataflow) for long-lived shared resources
Has the Beam community seen similar issues at scale?
If so:
We’re currently experimenting with a prototype that explicitly enforces a single producer per worker, but before going further we’d like to understand whether this aligns with Beam’s execution model and long-term direction.
Any insights, prior art, or pointers to discussions would be extremely helpful.
Thanks in advance, and cc’ing folks who may have relevant context.
Best regards,