jsancio commented on a change in pull request #11109: URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535
########## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ########## @@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() { } void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) { - listener.handleCommit(reader); + listener.handleCommit(this, reader); offset = reader.lastOffset().getAsLong(); } void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { - listener.handleSnapshot(reader); + listener.handleSnapshot(this, reader); offset = reader.lastContainedLogOffset(); } void handleLeaderChange(long offset, LeaderAndEpoch leader) { - listener.handleLeaderChange(leader); + listener.handleLeaderChange(this, leader); notifiedLeader = leader; this.offset = offset; } void beginShutdown() { - listener.beginShutdown(); + listener.beginShutdown(this); } + + @Override + public void close() {} Review comment: Should fix this by appending an event to eventQueue that removes this listener. ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws Exception { assertEquals(9L, secondListener.claimedEpochStartOffset(epoch)); } + @Test + public void testReregistrationChangesListenerContext() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + List<String> batch1 = Arrays.asList("1", "2", "3"); + List<String> batch2 = Arrays.asList("4", "5", "6"); + List<String> batch3 = Arrays.asList("7", "8", "9"); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(1, batch1) + .appendToLog(1, batch2) + .appendToLog(2, batch3) + .withUnknownLeader(epoch - 1) + .build(); + + context.becomeLeader(); + context.client.poll(); + assertEquals(10L, context.log.endOffset().offset); + + // Let the initial listener catch up + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0)); Review comment: Use the helper method "advance high-watermark" in a few of these places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org