Hi Beam community,

We’re running Apache Beam pipelines on *Google Cloud Dataflow* that write
to *Confluent Cloud Kafka*, and we’re running into scaling issues related
to *Kafka producer connection explosion* when autoscaling kicks in.

After investigating with Confluent support, the root cause appears to be
that as Dataflow workers scale out, the total number of Kafka connections
grows very quickly, eventually exceeding cluster-side connection guidelines
and leading to increased producer-side latency and timeouts. Short-term
mitigations like capping max workers help, but clearly do not scale as we
add more Kafka topics.

This led us to question whether our current usage of KafkaIO.write() aligns
with best practices for *producer lifecycle and connection reuse* in Beam.

*Current pattern*

Conceptually, our write transform looks like this (simplified):

@Override
public PDone expand(PCollection<V> input) {
  return input
      .apply("Convert to KV", WithKeys.of(partitionKeyFn))
      .apply(
          KafkaIO.<String, V>write()
              .withBootstrapServers(bootstrapServers)
              .withTopic(topicName)
              .withKeySerializer(StringSerializer.class)
              .withValueSerializer(valueSerializer)
              .withProducerConfigUpdates(
                  Map.of(
                      ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"
                  )));

     .withBootstrapServers(connectionConfig.bootstrapServer)

     .withTopic(topicName)
     .withKeySerializer(StringSerializer.class)
     .withValueSerializer(valueSerializer));

}

Operationally, we observe that:


   -

   Each Dataflow worker ends up creating *multiple Kafka producers*
   -

   Each producer maintains *multiple TCP connections* to brokers
   -

   When autoscaling occurs, total connections spike rapidly (tens of
   thousands)
   -

   This correlates strongly with Kafka producer timeouts and lag alerts

*Questions for the community*

We’d really appreciate guidance on the following:


   1.

   *What is the intended Kafka producer lifecycle in KafkaIO.write()?*

   -

      Is it expected that KafkaIO creates a producer per bundle / per
      transform instance?
      -

      Are there guarantees (or non-guarantees) around producer reuse within
      a worker?

   2.

   *Is “one Kafka producer per worker” a supported or recommended pattern?*

   Conceptually, this seems like the most scalable model given that:

   -

      All producers connect to the same Kafka cluster
      -

      Topic count continues to grow over time
      -

      Kafka producers are thread-safe and designed for multiplexing

   3.

   *Are there existing patterns or extension points to share producers
   across topics?*

   For example:

   -

      Reusing a producer instance across multiple KafkaIO.write() transforms
      -

      Custom sink implementations that manage producer lifecycle explicitly
      -

      Runner-specific hooks (Dataflow) for long-lived shared resources

   4.

   *Has the Beam community seen similar issues at scale?*

   If so:

   -

      How was it mitigated?
      -

      Were there changes upstream in Beam, or purely application-level
      workarounds?



We’re currently experimenting with a prototype that explicitly enforces *a
single producer per worker*, but before going further we’d like to
understand whether this aligns with Beam’s execution model and long-term
direction.

Any insights, prior art, or pointers to discussions would be extremely
helpful.

Thanks in advance, and cc’ing folks who may have relevant context.

Best regards,

-- 
YongGang Che
Sr. Software Engineer, Data Platform
[image: dialpad] <https://www.dialpad.com>

Reply via email to