I started a [java] consumer which subscribed to a topic "foo" (kafka is
configured to create 2 partitions for topics by default), which previously
did not exist. When subscribing I provided a ConsumerRebalanceListener
which logs the information passed to the onPartitionsAssigned and
onPartitionsRevoked methods.

Shortly after calling subscribe, the onPartitions* methods get invoked with
empty lists. I figure maybe this is expected because no data has been
produced on the topic yet.

I startup the kafka-console-producer, and submit 3 messages. I watch the
consumer, which does "poll" every 5 seconds. No messages are received.
Again, this makes sense since it hasn't been assigned any partitions yet.

A few minutes go by, I see in the logs the consumer is assigned foo-0 and
foo-1. It continues polling, still no messages.

I then submit 2 more messages using the kafka-console-producer, and the
consumer gets those.

The properties I pass to the KafkaConsumer are:

bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
group.id = foo
enable.auto.commit = false
session.timeout.ms = 30000
key.deserializer = ....StringDeserializer
value.deserializer = ....StringDeserializer


Maybe I'm just missing something. Thoughts?

Reply via email to