jsancio commented on code in PR #14971:
URL: https://github.com/apache/kafka/pull/14971#discussion_r1600190601
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -129,16 +135,46 @@ public void updateCheckQuorumForFollowingVoter(int id,
long currentTimeMs) {
}
}
- private void updateFetchedVoters(int id) {
+ private void updateFetchedVoters(int id, long offset, long position) {
if (id == localId) {
throw new IllegalArgumentException("Received a
FETCH/FETCH_SNAPSHOT request from the leader itself.");
}
- if (isVoter(id)) {
+ if (isVoter(id) && (isFetchOffsetAdvanced(id, offset) ||
isFetchSnapshotPositionAdvanced(id, position))) {
fetchedVoters.add(id);
}
}
+ /**
+ * We treat fetch offset is advanced when:
+ * 1. fetch offset is greater than or equal to leader end offset
+ * 2. if leader end offset is not set yet, we treat the follower is
progressing
+ * 3. fetch offset is greater than previous fetch offset
+ *
+ * @param id the node id
+ * @param offset the fetch offset
+ */
+ private boolean isFetchOffsetAdvanced(int id, long offset) {
+ if (offset >= voterStates.get(localId).endOffset.orElse(new
LogOffsetMetadata(-1)).offset ||
+ offset > fetchedOffset.get(id)) {
+ fetchedOffset.put(id, offset);
+ }
+ return false;
+ }
+
+ /**
+ * We treat fetch snapshot position is advanced when fetch snapshot
position is greater than previous fetch snapshot position
+ *
+ * @param id the node id
+ * @param position the fetch snapshot position
+ */
+ private boolean isFetchSnapshotPositionAdvanced(int id, long position) {
+ if (position > fetchedPosition.get(id)) {
+ fetchedPosition.put(id, position);
+ }
+ return false;
Review Comment:
Thanks for the PR and excuse the extreme delay. I'll try to take a look this
week!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]