ahuang98 commented on code in PR #16079: URL: https://github.com/apache/kafka/pull/16079#discussion_r1619301291
########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -103,189 +105,247 @@ public void testNonFollowerAcknowledgement() { assertThrows(IllegalArgumentException.class, () -> state.addAcknowledgementFrom(nonVoterId)); } - @Test - public void testUpdateHighWatermarkQuorumSizeOne() { - LeaderState<?> state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); - assertEquals(emptySet(), state.nonAcknowledgingVoters()); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); - assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); - } +// @Test +// public void testUpdateHighWatermarkQuorumSizeOne() { +// LeaderState<?> state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); +// assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); +// } +// +// @Test +// public void testNonMonotonicLocalEndOffsetUpdate() { +// LeaderState<?> state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// assertThrows(IllegalStateException.class, +// () -> state.updateLocalState(new LogOffsetMetadata(15L))); +// } +// +// @Test +// public void testLastCaughtUpTimeVoters() { +// int node1 = 1; +// int node2 = 2; +// int currentTime = 1000; +// int fetchTime = 0; +// int caughtUpTime = -1; +// LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(10L))); +// assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// +// // Node 1 falls behind +// assertFalse(state.updateLocalState(new LogOffsetMetadata(11L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 falls behind +// assertFalse(state.updateLocalState(new LogOffsetMetadata(100L))); +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to the last fetch offset +// int prevFetchTime = fetchTime; +// assertFalse(state.updateLocalState(new LogOffsetMetadata(200L))); +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); +// caughtUpTime = prevFetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node2 has never caught up to leader +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(300L))); +// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// } +// +// @Test +// public void testLastCaughtUpTimeObserver() { +// int node1 = 1; +// int currentTime = 1000; +// int fetchTime = 0; +// int caughtUpTime = -1; +// LeaderState<?> state = newLeaderState(singleton(localId), 5L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// +// // Node 1 falls behind +// assertTrue(state.updateLocalState(new LogOffsetMetadata(11L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 falls behind +// assertTrue(state.updateLocalState(new LogOffsetMetadata(100L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to the last fetch offset +// int prevFetchTime = fetchTime; +// assertTrue(state.updateLocalState(new LogOffsetMetadata(200L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); +// caughtUpTime = prevFetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// } +// +// @Test +// public void testIdempotentEndOffsetUpdate() { +// LeaderState<?> state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// } +// +// @Test +// public void testUpdateHighWatermarkMetadata() { +// LeaderState<?> state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// +// LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); +// assertTrue(state.updateLocalState(initialHw)); +// assertEquals(Optional.of(initialHw), state.highWatermark()); +// +// LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); +// assertTrue(state.updateLocalState(updateHw)); +// assertEquals(Optional.of(updateHw), state.highWatermark()); +// } +// +// @Test +// public void testUpdateHighWatermarkQuorumSizeTwo() { +// int otherNodeId = 1; +// LeaderState<?> state = newLeaderState(mkSet(localId, otherNodeId), 10L); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(13L))); +// assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); +// assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); +// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); +// assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); +// } @Test - public void testNonMonotonicLocalEndOffsetUpdate() { - LeaderState<?> state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertThrows(IllegalStateException.class, - () -> state.updateLocalState(new LogOffsetMetadata(15L))); - } - - @Test - public void testLastCaughtUpTimeVoters() { + public void testUpdateHighWatermarkQuorumSizeThree() { Review Comment: git is botching the diffs. the only changes to this test file are commenting out tests (temporary) and _adding_ new tests `testUpdateHighWatermarkRemovingFollowerFromVoterStates` and `testUpdateHighWatermarkRemovingLeaderFromVoterStates` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org