I want to manually fetch messages from all partitions of a topic. I'm doing this by:
1. Create a list of TopicPartition - one for each partition of my topic 2. Create KafkConsumer, and call .assign(myTopicPartitionsList) 3. For Each TopicPartition, seek to the offset I want to read But when I call consumer.poll(timeOut) - I only get messages from one of my partitions. I checked the consumer configs and don't see anything that would limit the fetching to some the partitions. Does anyone have any insights on how I can fetch messages for all the TopicPartition assigned to the consumer? I posted the relevant code snippet and consumer config below. Thanks! relevant code: List<TopicPartition> topicPartitions = new ArrayList<>(); for (int partitionNumber = 0; partitionNumber < NumPartitionsForTopic; partitionNumber++) { topicPartitions.add(new TopicPartition("my_topic", partitionNumber)); } ... try { KafkaConsumer consumer = KafkaConsumer<>(consumerProps); consumer.assign(topicPartitions); //seek to specific offset for each partition of the topic for (TopicParittion topicPartition : topicPartitions) { consumer.seek(topicPartition, offsetForPartition[topicPartition.partition()]); } ConsumerRecords<byte[], byte[]> records = consumer.poll(10000); //ISSUE: records only contains messages from 1 of the partitions } catch (...) { //hndle exceptions } finally { //close consumer } My KafkaConsumer config: INFO: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [brokerhostname1.com:9092, brokerhostname2.com:9092, brokerhostname3.com:9092] check.crcs = true client.id = my_client_id connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = my_group_id_123 heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 3145728 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 send.buffer.bytes = 131072 session.timeout.ms = 10000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer