Hi Ranganath, If messages are only received when a specific partition is assigned, but not when subscribing via a consumer group. This is because:
--> The consumer config has enable.auto.commit=false, but no manual offset commits are being made (commitSync() is missing). As a result, Kafka thinks there are no new messages to consume for the group. --> Also, if offsets were already committed earlier, --from-beginning has no effect unless the offsets are reset. *Recommended fixes:* 1. Add kafkaConsumer.commitSync() after polling records in Java code. 2. temporarily set enable.auto.commit=true to allow auto commits. 3. For CLI, reset the group offset using: kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console --topic input --reset-offsets --to-earliest --execute Regards, Sisindri M. On Thu, Jun 26, 2025 at 1:03 AM Samudrala, Ranganath [USA] <samudrala_rangan...@bah.com.invalid> wrote: > Hello > I have been struggling to receive messages when I subscribe to a topic or > when I use a consumer group. However, when I assign a partition I am able > to receive messages. > what am I doing wrong. > > ======================= > Consumer config: > ======================= > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer > value.deserializer=org.apache.kafka.common.serialization.StringDeserializer > group.id=console > socket.connection.setup.timeout.max.ms=5000 > retry.backoff.max.ms=10000 > max.poll.records=20 > reconnect.backoff.max.ms=10000 > socket.connection.setup.timeout.ms=2000 > request.timeout.ms=5000 > reconnect.backoff.ms=2000 > read_uncommitted=read_committed > bootstrap.servers=localhost:9092 > retry.backoff.ms=2000 > enable.auto.commit=false > allow.auto.create.topics=true > fetch.max.wait.ms=5000 > connections.max.idle.ms=600000 > session.timeout.ms=1800000 > max.poll.interval.ms=2000 > auto.offset.reset=earliest > default.api.timeout.ms=5000 > > ==================== > Non-working Java code: > > ===================================================================== > KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties); > kafkaConsumer.subscribe(Collections.singletonList(topic)); > while(true) { > ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); > . > . > } > ====================================================================== > Working code: > > KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties); > TopicPartition topicPartition = new TopicPartition(topic, partition); > kafkaConsumer.assign(Collections.singletonList(topicPartition)); > while(true) { > ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); > > . > . > } > ====================================================================== > > The behavior is the same while using CLI binaries. > > Non-working CLI command: > ================================= > > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > input --from-beginning --group console > Processed a total of 0 messages > > > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > input --from-beginning > Processed a total of 0 messages > =================================== > Working CLI command: > > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > input --partition 0 --offset earliest > test1 > value-6 > value-10 > . > . > =================================== > > regards > Ranganath Samudrala >