The consumer that owns the partition must do a `seek()` to the
corresponding offsets. It's not possible from "outside" to manipulate
the offset (if would also be dangerous if this would be allowed as a
consumer could get into a bad state).

Also note, even if you could commit a offset for the partition, if would
not directly affect the running consumer, because consumer maintain
their offsets in main memory and only commit (ie, write) them in regular
intervals. Offsets are only read when a new consumer starts and does not
have any offset yet.

I think you will need to update your application and add code that
seek() to the offset you want to and then do rolling bounce of your
existing consumer to get the new code live.

If you can accept downtime, you can stop all running consumers, than use
a new consumer, subscribe(), seek(), commmit() -- because all other
consumers are down, it will get all partitions assigned and thus, you
will be able to manipulate the offset. This offset will be picked up on
startup of the original consumers afterwards.


-Matthias


On 3/12/18 7:45 PM, York Zhang wrote:
> I have some running consumers.  I want to set the offset of a specific
> topic-partition-group with a new consumer instance in the same group. I
> have tried to use subscribe(), but kafka randomly assign partition to the
> new consumer, I can't operate the partition I want. Then I tried to use
> assign(), but when I tried to commit offset, I got exceptions :
> "Error UNKNOWN_MEMBER_ID occurred while committing offsets for group
> xxxxxxx" "Commit cannot be completed due to group rebalance".
> 
> The codes are:
>     consumer.poll(0);
>     consumer.seekToEnd(partition);
>     long endOffset =  consumer.position( partition);
>     cosumer.commitSync(Collections.singletonMap(partition, new
> OffsetAndMetadata(endOffset)));
> 
> I'm using kafka 0.9.0. Is there any way I can set offset correctly ?
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to