Which beam version are you using?
Sent from my iPhone

On Dec 17, 2025, at 10:38 AM, Reuven Lax via user <[email protected]> wrote:


What is the parallelism of the input here? The Kafka write probably inherits the parallelism of the previous transform - you can add a Reshuffle to explicitly reduce the parallelism before writing to Kafka.

On Wed, Dec 17, 2025 at 10:30 AM YongGang Che via user <[email protected]> wrote:
Hi Beam community,

We’re running Apache Beam pipelines on Google Cloud Dataflow that write to Confluent Cloud Kafka, and we’re running into scaling issues related to Kafka producer connection explosion when autoscaling kicks in.

After investigating with Confluent support, the root cause appears to be that as Dataflow workers scale out, the total number of Kafka connections grows very quickly, eventually exceeding cluster-side connection guidelines and leading to increased producer-side latency and timeouts. Short-term mitigations like capping max workers help, but clearly do not scale as we add more Kafka topics.

This led us to question whether our current usage of KafkaIO.write() aligns with best practices for producer lifecycle and connection reuse in Beam.

Current pattern

Conceptually, our write transform looks like this (simplified):

@Override
public PDone expand(PCollection<V> input) {
  return input
      .apply("Convert to KV", WithKeys.of(partitionKeyFn))
      .apply(
          KafkaIO.<String, V>write()
              .withBootstrapServers(bootstrapServers)
              .withTopic(topicName)
              .withKeySerializer(StringSerializer.class)
              .withValueSerializer(valueSerializer)
              .withProducerConfigUpdates(
                  Map.of(
                      ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
                  )));
     .withBootstrapServers(connectionConfig.bootstrapServer)
     .withTopic(topicName)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer));
}

Operationally, we observe that:

  • Each Dataflow worker ends up creating multiple Kafka producers

  • Each producer maintains multiple TCP connections to brokers

  • When autoscaling occurs, total connections spike rapidly (tens of thousands)

  • This correlates strongly with Kafka producer timeouts and lag alerts

Questions for the community

We’d really appreciate guidance on the following:

  1. What is the intended Kafka producer lifecycle in KafkaIO.write()?

    • Is it expected that KafkaIO creates a producer per bundle / per transform instance?

    • Are there guarantees (or non-guarantees) around producer reuse within a worker?

  2. Is “one Kafka producer per worker” a supported or recommended pattern?

    Conceptually, this seems like the most scalable model given that:

    • All producers connect to the same Kafka cluster

    • Topic count continues to grow over time

    • Kafka producers are thread-safe and designed for multiplexing

  3. Are there existing patterns or extension points to share producers across topics?

    For example:

    • Reusing a producer instance across multiple KafkaIO.write() transforms

    • Custom sink implementations that manage producer lifecycle explicitly

    • Runner-specific hooks (Dataflow) for long-lived shared resources

  4. Has the Beam community seen similar issues at scale?

    If so:

    • How was it mitigated?

    • Were there changes upstream in Beam, or purely application-level workarounds?


We’re currently experimenting with a prototype that explicitly enforces a single producer per worker, but before going further we’d like to understand whether this aligns with Beam’s execution model and long-term direction.

Any insights, prior art, or pointers to discussions would be extremely helpful.

Thanks in advance, and cc’ing folks who may have relevant context.

Best regards,


--
YongGang Che
Sr. Software Engineer, Data Platform
dialpad

Reply via email to