What is the parallelism of the input here? The Kafka write probably
inherits the parallelism of the previous transform - you can add a
Reshuffle to explicitly reduce the parallelism before writing to Kafka.

On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <[email protected]>
wrote:

> Hi Beam community,
>
> We’re running Apache Beam pipelines on *Google Cloud Dataflow* that write
> to *Confluent Cloud Kafka*, and we’re running into scaling issues related
> to *Kafka producer connection explosion* when autoscaling kicks in.
>
> After investigating with Confluent support, the root cause appears to be
> that as Dataflow workers scale out, the total number of Kafka connections
> grows very quickly, eventually exceeding cluster-side connection guidelines
> and leading to increased producer-side latency and timeouts. Short-term
> mitigations like capping max workers help, but clearly do not scale as we
> add more Kafka topics.
>
> This led us to question whether our current usage
> of KafkaIO.write() aligns with best practices for *producer lifecycle and
> connection reuse* in Beam.
>
> *Current pattern*
>
> Conceptually, our write transform looks like this (simplified):
>
> @Override
> public PDone expand(PCollection<V> input) {
>   return input
>       .apply("Convert to KV", WithKeys.of(partitionKeyFn))
>       .apply(
>           KafkaIO.<String, V>write()
>               .withBootstrapServers(bootstrapServers)
>               .withTopic(topicName)
>               .withKeySerializer(StringSerializer.class)
>               .withValueSerializer(valueSerializer)
>               .withProducerConfigUpdates(
>                   Map.of(
>                       ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
>                   )));
>
>      .withBootstrapServers(connectionConfig.bootstrapServer)
>
>      .withTopic(topicName)
>      .withKeySerializer(StringSerializer.class)
>      .withValueSerializer(valueSerializer));
>
> }
>
> Operationally, we observe that:
>
>
>    -
>
>    Each Dataflow worker ends up creating *multiple Kafka producers*
>    -
>
>    Each producer maintains *multiple TCP connections* to brokers
>    -
>
>    When autoscaling occurs, total connections spike rapidly (tens of
>    thousands)
>    -
>
>    This correlates strongly with Kafka producer timeouts and lag alerts
>
> *Questions for the community*
>
> We’d really appreciate guidance on the following:
>
>
>    1.
>
>    *What is the intended Kafka producer lifecycle in KafkaIO.write()?*
>
>    -
>
>       Is it expected that KafkaIO creates a producer per bundle / per
>       transform instance?
>       -
>
>       Are there guarantees (or non-guarantees) around producer reuse
>       within a worker?
>
>    2.
>
>    *Is “one Kafka producer per worker” a supported or recommended
>    pattern?*
>
>    Conceptually, this seems like the most scalable model given that:
>
>    -
>
>       All producers connect to the same Kafka cluster
>       -
>
>       Topic count continues to grow over time
>       -
>
>       Kafka producers are thread-safe and designed for multiplexing
>
>    3.
>
>    *Are there existing patterns or extension points to share producers
>    across topics?*
>
>    For example:
>
>    -
>
>       Reusing a producer instance across
>       multiple KafkaIO.write() transforms
>       -
>
>       Custom sink implementations that manage producer lifecycle
>       explicitly
>       -
>
>       Runner-specific hooks (Dataflow) for long-lived shared resources
>
>    4.
>
>    *Has the Beam community seen similar issues at scale?*
>
>    If so:
>
>    -
>
>       How was it mitigated?
>       -
>
>       Were there changes upstream in Beam, or purely application-level
>       workarounds?
>
>
>
> We’re currently experimenting with a prototype that explicitly enforces *a
> single producer per worker*, but before going further we’d like to
> understand whether this aligns with Beam’s execution model and long-term
> direction.
>
> Any insights, prior art, or pointers to discussions would be extremely
> helpful.
>
> Thanks in advance, and cc’ing folks who may have relevant context.
>
> Best regards,
>
> --
> YongGang Che
> Sr. Software Engineer, Data Platform
> [image: dialpad] <https://www.dialpad.com>
>

Reply via email to