Hi Sumeet,

It seems like your kafka consumer uses the LATEST offset(which is default
setting) as the start offset to read, which is 29. Do you have more than 29
records to read at that point? If the pipeline is only for testing purpose,
I would recommend reading from earliest offset to see whether you get
records. You can do so by constructing your ReadFromKafka like:
ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092',
'auto.offset.reset':'earliest'},
            topics=['test'])

On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <sumeet.malho...@gmail.com>
wrote:

> Hi All,
>
> I'm trying out a simple example of reading data off a Kafka topic into
> Apache Beam. Here's the relevant snippet:
>
>   with beam.Pipeline(options=pipeline_options) as pipeline:
>     _ = (
>         pipeline
>         | 'Read from Kafka' >> ReadFromKafka(
>             consumer_config={'bootstrap.servers': 'localhost:29092'},
>             topics=['test'])
>         | 'Print' >> beam.Map(print))
>
> Using the above Beam pipeline snippet, I don't see any messages coming in.
> Kafka is running locally in a docker container, and I'm able to use
> `kafkacat` from the host (outside the container) to publish and subscribe
> to messages. So, I guess there are no issues on that front.
>
> It appears that Beam is able to connect to Kafka and get notified of new
> messages, as I see the offset changes in the Beam logs as I publish data
> from `kafkacat`:
>
> INFO:root:severity: INFO
> timestamp {
>   seconds: 1612886861
>   nanos: 534000000
> }
> message: "[Consumer
> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset
> of partition test-0"
> log_location:
> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
> thread: "22"
>
> INFO:root:severity: INFO
> timestamp {
>   seconds: 1612886861
>   nanos: 537000000
> }
> message: "[Consumer
> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
> partition test-0 to offset 29."
> log_location:
> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
> thread: "22"
>
> This is how I'm publishing data using `kafkacat`:
>
> $ kafkacat -P -b localhost:29092 -t test -K:
> 1:foo
> 1:bar
>
> and I can confirm that its being received, again using `kafkacat`:
>
> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
> Key: 1 Value: foo
> Key: 1 Value: bar
>
> But despite this, I don't see the actual message being printed by Beam as
> I expected. Any pointers to what's missing here are appreciated. I'm
> suspecting this could be a decoding issue on the Beam pipeline side, but
> could be incorrect.
>
> Thanks in advance for any pointers!
>
> Cheers,
> Sumeet
>

Reply via email to