showuon commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1620608088
########## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ########## @@ -17,108 +17,296 @@ package org.apache.kafka.raft; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.Iterator; import java.util.Map; -import java.util.OptionalInt; +import java.util.Optional; import java.util.OptionalLong; import java.util.Random; -import java.util.Set; +import org.apache.kafka.common.Node; +/** + * The request manager keeps tracks of the connection with remote replicas. + * + * When sending a request update this type by calling {@code onRequestSent(Node, long, long)}. When + * the RPC returns a response, update this manager with {@code onResponseResult(Node, long, boolean, long)}. + * + * Connections start in the ready state ({@code isReady(Node, long)} returns true). + * + * When a request times out or completes successfully the collection will transition back to the + * ready state. + * + * When a request completes with an error it still transition to the backoff state until + * {@code retryBackoffMs}. + */ public class RequestManager { - private final Map<Integer, ConnectionState> connections = new HashMap<>(); - private final List<Integer> voters = new ArrayList<>(); + private final Map<String, ConnectionState> connections = new HashMap<>(); + private final ArrayList<Node> bootstrapServers; private final int retryBackoffMs; private final int requestTimeoutMs; private final Random random; - public RequestManager(Set<Integer> voterIds, - int retryBackoffMs, - int requestTimeoutMs, - Random random) { - + public RequestManager( + Collection<Node> bootstrapServers, + int retryBackoffMs, + int requestTimeoutMs, + Random random + ) { + this.bootstrapServers = new ArrayList<>(bootstrapServers); this.retryBackoffMs = retryBackoffMs; this.requestTimeoutMs = requestTimeoutMs; - this.voters.addAll(voterIds); this.random = random; + } - for (Integer voterId: voterIds) { - ConnectionState connection = new ConnectionState(voterId); - connections.put(voterId, connection); + /** + * Returns true if there any connection with pending requests. + * + * This is useful for satisfying the invariant that there is only one pending Fetch request. + * If there are more than one pending fetch request, it is possible for the follower to write + * the same offset twice. + * + * @param currentTimeMs the current time + * @return true if the request manager is tracking at least one request + */ + public boolean hasAnyInflightRequest(long currentTimeMs) { + boolean result = false; + + Iterator<ConnectionState> iterator = connections.values().iterator(); + while (iterator.hasNext()) { + ConnectionState connection = iterator.next(); + if (connection.hasRequestTimedOut(currentTimeMs)) { + // Mark the node as ready after request timeout + iterator.remove(); + } else if (connection.isBackoffComplete(currentTimeMs)) { + // Mark the node as ready after completed backoff + iterator.remove(); + } else if (connection.hasInflightRequest(currentTimeMs)) { Review Comment: It looks like we'll also check hasRequestTimeout in hasInflightRequest. We can consider to do a small optimization here in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org