Hi Rajath,

My expectation was that, since I am using RoundRobinPartitioner, the key
value would be ignored. My key value is b''.
This code writes data to kafka. The EncodeAvro transformation produces
elements of (b'', some avro encoded value)

from apache_beam.io.kafka import WriteToKafka

        | f'EncodeAvro {self.transform_id}' >>
beam.ParDo(ToKvAvroDoFn(self.schema, self.key_fn,
self.schema_registry_url, self.topic))
        | f'WriteToKafka {self.transform_id}' >> WriteToKafka(
    producer_config=self.kafka_config,
    topic=self.topic,
    expansion_service=self.expansion_service
)


Thanks and Regards
Robert

On Tue, Sep 23, 2025 at 10:31 AM Rajath BK <rajath.u...@gmail.com> wrote:

> Hi Robert,
>     It would help if you shared how the data is being written to Kafka. It
> is the partition key that determines how the data gets distributed across
> the partitions. If the partition key space is large enough the data will be
> fairly distributed.
>
> - Thanks and Regards
> Rajath
>
>
> On Mon, Sep 22, 2025 at 1:13 PM Robert Sianta <robert.sia...@gmail.com>
> wrote:
>
>> Hello Beam community,
>>
>> I’m running into a Kafka partitioning issue when using Beam’s
>> WriteToKafka transform together with a custom partitioner through the IO
>> expansion service.
>>
>> *Setup details:*
>>
>>    -
>>
>>    Beam version: 2.52.0
>>    -
>>
>>    Flink 1.16.3
>>    -
>>
>>    Python 3.11
>>    -
>>
>>    Expansion service: beam-sdks-java-io-expansion-service-2.52.0.jar,
>>    Java 11
>>    -
>>
>>    Producer configured with non-default partitioner:
>>
>>    producer_config = {
>>        "bootstrap.servers": "...",
>>        "partitioner.class": 
>> "org.apache.kafka.clients.producer.RoundRobinPartitioner",
>>        ...
>>    }
>>
>>    -
>>
>>    Expansion service started with:
>>
>>    JavaJarExpansionService(
>>        jar_path,
>>        append_args=[
>>            "--defaultEnvironmentType=PROCESS",
>>            f'--defaultEnvironmentConfig={{"command": "{cmd}"}}',
>>            "--experiments=use_deprecated_read"
>>        ])
>>
>>
>> *Observed problem:*
>>
>>    -
>>
>>    Data written to Kafka is *not spread across all partitions*.
>>    -
>>
>>    The round robin seems to work partially: the partitions that are used
>>    are evenly filled.
>>    -
>>
>>    However, only *half of the partitions* are ever populated.
>>    -
>>
>>       Example:
>>       -
>>
>>          Job with parallelism 40 → topic with 52 partitions → only 26
>>          partitions get data.
>>          -
>>
>>          After reducing topic to 40 partitions → only 20 are used.
>>          -
>>
>>    Running *multiple independent jobs* writing to the same topic results
>>    in them *using the same subset of partitions*.
>>
>> *What we tried:*
>>
>>    -
>>
>>    Changing number of partitions on the target topic → always only half
>>    are used.
>>    -
>>
>>    Different topics and jobs → same behavior (only half partitions used).
>>
>> *Question:*
>>
>>    -
>>
>>    Is this a known limitation or bug when using WriteToKafka with
>>    RoundRobinPartitioner through the expansion service?
>>    -
>>
>>    Could it be related to how Beam maps bundles to Kafka producer
>>    instances, or some interaction between Flink parallelism and Kafka
>>    partitioning?
>>    -
>>
>>    Any ideas on how to make Beam actually use *all* partitions with
>>    RoundRobinPartitioner?
>>
>> Thanks a lot for any pointers!
>>
>> Best regards,
>> Robert Sianta
>>
>

Reply via email to