The consumer that owns the partition must do a `seek()` to the corresponding offsets. It's not possible from "outside" to manipulate the offset (if would also be dangerous if this would be allowed as a consumer could get into a bad state).
Also note, even if you could commit a offset for the partition, if would not directly affect the running consumer, because consumer maintain their offsets in main memory and only commit (ie, write) them in regular intervals. Offsets are only read when a new consumer starts and does not have any offset yet. I think you will need to update your application and add code that seek() to the offset you want to and then do rolling bounce of your existing consumer to get the new code live. If you can accept downtime, you can stop all running consumers, than use a new consumer, subscribe(), seek(), commmit() -- because all other consumers are down, it will get all partitions assigned and thus, you will be able to manipulate the offset. This offset will be picked up on startup of the original consumers afterwards. -Matthias On 3/12/18 7:45 PM, York Zhang wrote: > I have some running consumers. I want to set the offset of a specific > topic-partition-group with a new consumer instance in the same group. I > have tried to use subscribe(), but kafka randomly assign partition to the > new consumer, I can't operate the partition I want. Then I tried to use > assign(), but when I tried to commit offset, I got exceptions : > "Error UNKNOWN_MEMBER_ID occurred while committing offsets for group > xxxxxxx" "Commit cannot be completed due to group rebalance". > > The codes are: > consumer.poll(0); > consumer.seekToEnd(partition); > long endOffset = consumer.position( partition); > cosumer.commitSync(Collections.singletonMap(partition, new > OffsetAndMetadata(endOffset))); > > I'm using kafka 0.9.0. Is there any way I can set offset correctly ? >
signature.asc
Description: OpenPGP digital signature