Hi, Is it possible to start 0.9 or 0.10 consumers from a specified offset, while still using consumer groups with dynamic re-balancing?
Here is what have found so far: Case 1: If we use consumer.assign(…) method to manually assign partitions to consumers - we can do all below actions: consumer.seek(<specificPartition>, <myCustomOffset>); or: consumer.seekToBeginning(<specificPartition>); consumer.seekToEnd(<specificPartition>); Basically, we have full control over which position to start the consumer from, BUT at the expense of not having the partition re-assignment done dynamically by Kafka Case 2: If we use consumer.subscribe(…) method - Kafka will manage the re-ballancing, however, we cannot do any of the three options above … :( So, we tried the following to “hack” around it - at the consumer start up time, *before* entering the poll() loop: // get coordinator from the private field of the consumer: ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true); // make sure all partitions are already coordinator.ensurePartitionAssignment(); // get the list of partitions assigned to this specific consumer: Set<TopicPartition> assignedTopicPartitions = consumer.assignment() // now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above. for (TopicPartition assignedPartition: assignedTopicPartitions) { consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever ... } // now start the poll() loop: while (true) { ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs); for (ConsumerRecord<String, String> record : records) { // processMessage(record.value(), record.offset()); } } This feels too hack-y for my taste, and, also, I am not sure if this logic will hold during the actual re-balancing , when, say, new consumers are added to the group. Could somebody validate this approach or suggest a better way to accomplish what we need ? thanks! Marina