Hello Beam community,
I’m running into a Kafka partitioning issue when using Beam’s WriteToKafka
transform together with a custom partitioner through the IO expansion
service.
*Setup details:*
-
Beam version: 2.52.0
-
Flink 1.16.3
-
Python 3.11
-
Expansion service: beam-sdks-java-io-expansion-service-2.52.0.jar, Java
11
-
Producer configured with non-default partitioner:
producer_config = {
"bootstrap.servers": "...",
"partitioner.class":
"org.apache.kafka.clients.producer.RoundRobinPartitioner",
...
}
-
Expansion service started with:
JavaJarExpansionService(
jar_path,
append_args=[
"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={{"command": "{cmd}"}}',
"--experiments=use_deprecated_read"
])
*Observed problem:*
-
Data written to Kafka is *not spread across all partitions*.
-
The round robin seems to work partially: the partitions that are used
are evenly filled.
-
However, only *half of the partitions* are ever populated.
-
Example:
-
Job with parallelism 40 → topic with 52 partitions → only 26
partitions get data.
-
After reducing topic to 40 partitions → only 20 are used.
-
Running *multiple independent jobs* writing to the same topic results in
them *using the same subset of partitions*.
*What we tried:*
-
Changing number of partitions on the target topic → always only half are
used.
-
Different topics and jobs → same behavior (only half partitions used).
*Question:*
-
Is this a known limitation or bug when using WriteToKafka with
RoundRobinPartitioner through the expansion service?
-
Could it be related to how Beam maps bundles to Kafka producer
instances, or some interaction between Flink parallelism and Kafka
partitioning?
-
Any ideas on how to make Beam actually use *all* partitions with
RoundRobinPartitioner?
Thanks a lot for any pointers!
Best regards,
Robert Sianta