Can you pastebin relevant logs from client and broker ? Thanks
On Fri, Nov 3, 2017 at 1:37 PM, Manan G <manan....@gmail.com> wrote: > Hello, > > I am using 0.11.0.0 version of Kakfa broker and Java client library. My > consumer code tracks offsets for each assigned partition and at some time > interval manually commits offsets by specifying partition->offset map. > > What I noticed is, after the rebalance, even if consumer loses some > partitions that were assigned to it previously, offset commit for those > lost partitions still succeeds by that same consumer! Shouldn't offset > commit fail in this scenario since consumer is trying to commit offsets for > partitions that are not assigned to it? > > For clarity below are the logs I see with comments: > > // This is when consumer starts for "test" topic and it picks up 3 > partitions > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0] > > // Now consumer processes 3 records from partition 0 and 7 records from > partition 2 - confirmed with log statements > log>> ... > > // Rebalance happens - right now, my code does not commit any pending > offsets here and just prints the log statement > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0] > > // After re-balance, consumer loses partition 0 and 1. Again, my code does > not do anything on this callback and just prints the log statement > onPartitionsAssigned: partitions=[test-2] > > // Since the code did not commit offsets during revoke call, after > rebalance, poll() returns all records for assigned partitions since last > offset commit. > // ... So we re-process 7 records from partition 2. This was confirmed with > log statements. > log>> ... > > // Offset commit gets triggered after some time and due to the bug in the > code, it tries to commit offsets for both partition 0 and 2. > // There is no failure however! I can see on Kafka broker side that offset > for partition 0 is updated to 3. > // I made sure that another consumer that is actually assigned partition 0 > after re-balance has not committed offset yet. > commitOffsets: {0=3, 2=7} > > > Thanks, > M >