I've noticed some really interesting and surprising behavior with ReadFromKafka in Python.
I'm working with a simple Apache Beam pipeline consisting of reading from an unbounded Kafka topic and printing the values out. I have two flavors of this. This is done via the Flink Runner. Version 1 with beam.Pipeline(options=beam_options) as p: (p | "Read from Kafka topic" >> ReadFromKafka( consumer_config=consumer_config, topics=[producer_topic]) | 'log' >> beam.ParDo(LogData()) This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the default implementation that comes with Apache Beam). *Version 2* with beam.Pipeline(options=beam_options) as p: (p | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume( consumer_config={ "topic": producer_topic, 'auto_offset_reset': 'earliest', "group_id": 'transaction_classification', "bootstrap_servers": servers, }) This one is using Beam nuggets: from beam_nuggets.io.kafkaio import KafkaConsume I have configured the Kafka producer to produce an element every 1 second. What I've observed is that when I consume from ReadFromKafka (version 1), the elements get produced around 4-6 seconds apart, and are batched together. On the other hand, if I tried the same thing with KafkaConsume (version 2), then I get elements as they are produced (i.e. every second), which is exactly the behavior I expected. I have tried to make the consumer_config to be the same for both, but it doesn't seem to have any effect on version 1. Now, I would like to stick to version 1 because that gives me proper metrics in the Flink UI, while version 2 works better, I don't get any metrics in Flink (everything is reported as 0 bytes received / 0 records received). I don't understand why ReadFromKafka seems to be batching a few records together before it gets pushed down the pipeline. Does anyone have any ideas? This behavior doesn't exhibit itself on the DataFlow runner though. Is there any setting that I can try? Otherwise, how are folks dealing with reading from Kafka for unbounded streams?