ahuang98 commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1619301291


##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -103,189 +105,247 @@ public void testNonFollowerAcknowledgement() {
         assertThrows(IllegalArgumentException.class, () -> 
state.addAcknowledgementFrom(nonVoterId));
     }
 
-    @Test
-    public void testUpdateHighWatermarkQuorumSizeOne() {
-        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
-        assertEquals(emptySet(), state.nonAcknowledgingVoters());
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(20)));
-        assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
-    }
+//    @Test
+//    public void testUpdateHighWatermarkQuorumSizeOne() {
+//        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
+//        assertEquals(emptySet(), state.nonAcknowledgingVoters());
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(20)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
+//    }
+//
+//    @Test
+//    public void testNonMonotonicLocalEndOffsetUpdate() {
+//        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+//        assertThrows(IllegalStateException.class,
+//            () -> state.updateLocalState(new LogOffsetMetadata(15L)));
+//    }
+//
+//    @Test
+//    public void testLastCaughtUpTimeVoters() {
+//        int node1 = 1;
+//        int node2 = 2;
+//        int currentTime = 1000;
+//        int fetchTime = 0;
+//        int caughtUpTime = -1;
+//        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 
10L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(10L)));
+//        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+//        assertEquals(Optional.empty(), state.highWatermark());
+//
+//        // Node 1 falls behind
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(11L)));
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L)));
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 catches up to leader
+//        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L)));
+//        caughtUpTime = fetchTime;
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 falls behind
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(100L)));
+//        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L)));
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 catches up to the last fetch offset
+//        int prevFetchTime = fetchTime;
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(200L)));
+//        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(100L)));
+//        caughtUpTime = prevFetchTime;
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node2 has never caught up to leader
+//        assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(300L)));
+//        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(200L)));
+//        assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
+//        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(250L)));
+//        assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
+//    }
+//
+//    @Test
+//    public void testLastCaughtUpTimeObserver() {
+//        int node1 = 1;
+//        int currentTime = 1000;
+//        int fetchTime = 0;
+//        int caughtUpTime = -1;
+//        LeaderState<?> state = newLeaderState(singleton(localId), 5L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertEquals(emptySet(), state.nonAcknowledgingVoters());
+//
+//        // Node 1 falls behind
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(11L)));
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L)));
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 catches up to leader
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L)));
+//        caughtUpTime = fetchTime;
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 falls behind
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(100L)));
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L)));
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 catches up to the last fetch offset
+//        int prevFetchTime = fetchTime;
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(200L)));
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(102L)));
+//        caughtUpTime = prevFetchTime;
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//        // Node 1 catches up to leader
+//        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(200L)));
+//        caughtUpTime = fetchTime;
+//        assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//        assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//    }
+//
+//    @Test
+//    public void testIdempotentEndOffsetUpdate() {
+//        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+//    }
+//
+//    @Test
+//    public void testUpdateHighWatermarkMetadata() {
+//        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
+//        assertEquals(Optional.empty(), state.highWatermark());
+//
+//        LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, 
Optional.of(new MockOffsetMetadata("bar")));
+//        assertTrue(state.updateLocalState(initialHw));
+//        assertEquals(Optional.of(initialHw), state.highWatermark());
+//
+//        LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, 
Optional.of(new MockOffsetMetadata("baz")));
+//        assertTrue(state.updateLocalState(updateHw));
+//        assertEquals(Optional.of(updateHw), state.highWatermark());
+//    }
+//
+//    @Test
+//    public void testUpdateHighWatermarkQuorumSizeTwo() {
+//        int otherNodeId = 1;
+//        LeaderState<?> state = newLeaderState(mkSet(localId, otherNodeId), 
10L);
+//        assertFalse(state.updateLocalState(new LogOffsetMetadata(13L)));
+//        assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters());
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertFalse(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(10L)));
+//        assertEquals(emptySet(), state.nonAcknowledgingVoters());
+//        assertEquals(Optional.empty(), state.highWatermark());
+//        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(11L)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(11L)), 
state.highWatermark());
+//        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(13L)));
+//        assertEquals(Optional.of(new LogOffsetMetadata(13L)), 
state.highWatermark());
+//    }
 
     @Test
-    public void testNonMonotonicLocalEndOffsetUpdate() {
-        LeaderState<?> state = newLeaderState(singleton(localId), 15L);
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
-        assertThrows(IllegalStateException.class,
-            () -> state.updateLocalState(new LogOffsetMetadata(15L)));
-    }
-
-    @Test
-    public void testLastCaughtUpTimeVoters() {
+    public void testUpdateHighWatermarkQuorumSizeThree() {

Review Comment:
   git is making the diffs really confusing. the main changes to this test file 
are commenting out tests (temporary) and _adding_ new tests 
`testUpdateHighWatermarkRemovingFollowerFromVoterStates` and 
`testUpdateHighWatermarkRemovingLeaderFromVoterStates`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to