jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() {
         }
 
         void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) {
-            listener.handleCommit(reader);
+            listener.handleCommit(this, reader);
             offset = reader.lastOffset().getAsLong();
         }
 
         void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            listener.handleSnapshot(reader);
+            listener.handleSnapshot(this, reader);
             offset = reader.lastContainedLogOffset();
         }
 
         void handleLeaderChange(long offset, LeaderAndEpoch leader) {
-            listener.handleLeaderChange(leader);
+            listener.handleLeaderChange(this, leader);
             notifiedLeader = leader;
             this.offset = offset;
         }
 
         void beginShutdown() {
-            listener.beginShutdown();
+            listener.beginShutdown(this);
         }
+
+        @Override
+        public void close() {}

Review comment:
       Should fix this by appending an event to eventQueue that removes this 
listener.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws 
Exception {
         assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
     }
 
+    @Test
+    public void testReregistrationChangesListenerContext() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        List<String> batch1 = Arrays.asList("1", "2", "3");
+        List<String> batch2 = Arrays.asList("4", "5", "6");
+        List<String> batch3 = Arrays.asList("7", "8", "9");
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(1, batch1)
+            .appendToLog(1, batch2)
+            .appendToLog(2, batch3)
+            .withUnknownLeader(epoch - 1)
+            .build();
+
+        context.becomeLeader();
+        context.client.poll();
+        assertEquals(10L, context.log.endOffset().offset);
+
+        // Let the initial listener catch up
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, 
epoch, 0));

Review comment:
       Use the helper method "advance high-watermark" in a few of these places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to