showuon commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1620562119


##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -17,108 +17,296 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
+/**
+ * The request manager keeps tracks of the connection with remote replicas.
+ *
+ * When sending a request update this type by calling {@code 
onRequestSent(Node, long, long)}. When
+ * the RPC returns a response, update this manager with {@code 
onResponseResult(Node, long, boolean, long)}.
+ *
+ * Connections start in the ready state ({@code isReady(Node, long)} returns 
true).
+ *
+ * When a request times out or completes successfully the collection will 
transition back to the
+ * ready state.
+ *
+ * When a request completes with an error it still transition to the backoff 
state until
+ * {@code retryBackoffMs}.
+ */
 public class RequestManager {
-    private final Map<Integer, ConnectionState> connections = new HashMap<>();
-    private final List<Integer> voters = new ArrayList<>();
+    private final Map<String, ConnectionState> connections = new HashMap<>();
+    private final ArrayList<Node> bootstrapServers;
 
     private final int retryBackoffMs;
     private final int requestTimeoutMs;
     private final Random random;
 
-    public RequestManager(Set<Integer> voterIds,
-                          int retryBackoffMs,
-                          int requestTimeoutMs,
-                          Random random) {
-
+    public RequestManager(
+        Collection<Node> bootstrapServers,
+        int retryBackoffMs,
+        int requestTimeoutMs,
+        Random random
+    ) {
+        this.bootstrapServers = new ArrayList<>(bootstrapServers);
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
-        this.voters.addAll(voterIds);
         this.random = random;
+    }
 
-        for (Integer voterId: voterIds) {
-            ConnectionState connection = new ConnectionState(voterId);
-            connections.put(voterId, connection);
+    /**
+     * Returns true if there any connection with pending requests.
+     *
+     * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+     * If there are more than one pending fetch request, it is possible for 
the follower to write
+     * the same offset twice.
+     *
+     * @param currentTimeMs the current time
+     * @return true if the request manager is tracking at least one request
+     */
+    public boolean hasAnyInflightRequest(long currentTimeMs) {
+        boolean result = false;
+
+        Iterator<ConnectionState> iterator = connections.values().iterator();
+        while (iterator.hasNext()) {
+            ConnectionState connection = iterator.next();
+            if (connection.hasRequestTimedOut(currentTimeMs)) {
+                // Mark the node as ready after request timeout
+                iterator.remove();
+            } else if (connection.isBackoffComplete(currentTimeMs)) {
+                // Mark the node as ready after completed backoff
+                iterator.remove();
+            } else if (connection.hasInflightRequest(currentTimeMs)) {
+                // If there is at least one inflight request, it is enough
+                // to stop checking the rest of the connections
+                result = true;
+                break;
+            }
         }
-    }
 
-    public ConnectionState getOrCreate(int id) {
-        return connections.computeIfAbsent(id, key -> new ConnectionState(id));
+        return result;
     }
 
-    public OptionalInt findReadyVoter(long currentTimeMs) {
-        int startIndex = random.nextInt(voters.size());
-        OptionalInt res = OptionalInt.empty();
-        for (int i = 0; i < voters.size(); i++) {
-            int index = (startIndex + i) % voters.size();
-            Integer voterId = voters.get(index);
-            ConnectionState connection = connections.get(voterId);
-            boolean isReady = connection.isReady(currentTimeMs);
+    /**
+     * Returns a random bootstrap node that is ready to receive a request.
+     *
+     * This method doesn't return a node if there is at least one request 
pending. In general this
+     * method is used to send Fetch requests. Fetch requests have the 
invariant that there can
+     * only be one pending Fetch request for the LEO.
+     *
+     * @param currentTimeMs the current time
+     * @return a random ready bootstrap node
+     */
+    public Optional<Node> findReadyBootstrapServer(long currentTimeMs) {
+        // Check that there are no infilght requests accross any of the known 
nodes not just
+        // the bootstrap servers
+        if (hasAnyInflightRequest(currentTimeMs)) {
+            return Optional.empty();
+        }
 
-            if (isReady) {
-                res = OptionalInt.of(voterId);
-            } else if (connection.inFlightCorrelationId.isPresent()) {
-                res = OptionalInt.empty();
+        int startIndex = random.nextInt(bootstrapServers.size());
+        Optional<Node> result = Optional.empty();
+        for (int i = 0; i < bootstrapServers.size(); i++) {
+            int index = (startIndex + i) % bootstrapServers.size();
+            Node node = bootstrapServers.get(index);
+
+            if (isReady(node, currentTimeMs)) {
+                result = Optional.of(node);
                 break;
             }
         }
-        return res;
+
+        return result;
     }
 
-    public long backoffBeforeAvailableVoter(long currentTimeMs) {
-        long minBackoffMs = Long.MAX_VALUE;
-        for (Integer voterId : voters) {
-            ConnectionState connection = connections.get(voterId);
-            if (connection.isReady(currentTimeMs)) {
-                return 0L;
+    /**
+     * Computes the amount of time needed to wait before a bootstrap server is 
ready for a Fetch
+     * request.
+     *
+     * If there is a connection with a pending request it returns the amount 
of time to wait until
+     * the request times out.
+     *
+     * Returns zero, if there are no pending request and at least one of the 
boorstrap servers is
+     * ready.
+     *
+     * If all of the bootstrap servers are backing off and there are no 
pending requests, return
+     * the minimum amount of time until a bootstrap server becomes ready.
+     *
+     * @param currentTimeMs the current time
+     * @return the amount of time to wait until bootstrap server can accept a 
Fetch request
+     */
+    public long backoffBeforeAvailableBootstrapServer(long currentTimeMs) {
+        long minBackoffMs = retryBackoffMs;
+
+        Iterator<ConnectionState> iterator = connections.values().iterator();
+        while (iterator.hasNext()) {
+            ConnectionState connection = iterator.next();
+            if (connection.hasRequestTimedOut(currentTimeMs)) {
+                // Mark the node as ready after request timeout
+                iterator.remove();
+            } else if (connection.isBackoffComplete(currentTimeMs)) {
+                // Mark the node as ready after completed backoff
+                iterator.remove();
+            } else if (connection.hasInflightRequest(currentTimeMs)) {
+                // There can be at most one inflight fetch request
+                return connection.remainingRequestTimeMs(currentTimeMs);
             } else if (connection.isBackingOff(currentTimeMs)) {
                 minBackoffMs = Math.min(minBackoffMs, 
connection.remainingBackoffMs(currentTimeMs));
-            } else {
-                minBackoffMs = Math.min(minBackoffMs, 
connection.remainingRequestTimeMs(currentTimeMs));
             }
         }
+
+        // There are no inflight fetch requests so check if there is a ready 
bootstrap server
+        for (Node node : bootstrapServers) {
+            if (isReady(node, currentTimeMs)) {
+                return 0L;
+            }
+        }
+
+        // There are no ready bootstrap servers and inflight fetch requests, 
return the backoff
         return minBackoffMs;
     }
 
+    public boolean hasRequestTimedOut(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.hasRequestTimedOut(timeMs);
+    }
+
+    public boolean isReady(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return true;
+        }
+
+        boolean ready = state.isReady(timeMs);
+        if (ready) {
+            reset(node);
+        }
+
+        return ready;
+    }
+
+    public boolean isBackingOff(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.isBackingOff(timeMs);
+    }
+
+    public long remainingRequestTimeMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return state.remainingRequestTimeMs(timeMs);
+    }
+
+    public long remainingBackoffMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return  state.remainingBackoffMs(timeMs);

Review Comment:
   nit: additional space after return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to