philipnee commented on code in PR #15020:
URL: https://github.com/apache/kafka/pull/15020#discussion_r1439651419


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -70,8 +70,10 @@ protected void maybeThrowAuthFailure(Node node) {
      */
     @Override
     public PollResult poll(long currentTimeMs) {
+        boolean checkNodeAvailability = false;
+
         return pollInternal(
-                prepareFetchRequests(),
+                prepareFetchRequests(checkNodeAvailability),

Review Comment:
   just pass false.  the var above is useless.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -403,7 +405,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareCloseFetchSessi
      * Create fetch requests for all nodes for which we have assigned 
partitions
      * that have no existing requests in flight.
      */
-    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests(boolean checkNodeAvailability) {

Review Comment:
   `final boolean`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -372,7 +372,7 @@ Node selectReadReplica(final TopicPartition partition, 
final Node leaderReplica,
         }
     }
 
-    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareCloseFetchSessionRequests() {
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareCloseFetchSessionRequests(boolean checkNodeAvailability) {

Review Comment:
   `final boolean checkNodeAvailability`
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -428,37 +430,53 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
             // Use the preferred read replica if set, otherwise the 
partition's leader
             Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
 
-            if (isUnavailable(node)) {
-                maybeThrowAuthFailure(node);
-
-                // If we try to send during the reconnect backoff window, then 
the request is just
-                // going to be failed anyway before being sent, so skip 
sending the request for now
-                log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
-            } else if (nodesWithPendingFetchRequests.contains(node.id())) {
-                log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+            if (checkNodeAvailability) {

Review Comment:
   is it possible to avoid nested if? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -102,7 +102,9 @@ public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition>
      * @return number of fetches sent
      */
     public synchronized int sendFetches() {
-        final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = 
prepareFetchRequests();
+        boolean checkNodeAvailability = true;

Review Comment:
   same as above



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -385,7 +385,9 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareCloseFetchSessi
                 // skip sending the close request.
                 final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
 
-                if (fetchTarget == null || isUnavailable(fetchTarget)) {
+                boolean fetchTargetAvailability = checkNodeAvailability ? 
(fetchTarget == null || isUnavailable(fetchTarget)) : fetchTarget == null;

Review Comment:
   `final boolean`
   
   - i would also call this `isFetchTargetAvailable`
   - let's not use inline if.  this makes the code harder to read.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -82,9 +84,14 @@ public PollResult poll(long currentTimeMs) {
      */
     @Override
     public PollResult pollOnClose() {
+
+        boolean checkNodeAvailability = false;
+
+
         // TODO: move the logic to poll to handle signal close
+
         return pollInternal(
-                prepareCloseFetchSessionRequests(),
+                prepareCloseFetchSessionRequests(checkNodeAvailability),

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -781,10 +781,10 @@ public void testFetchSkipsBlackedOutNodes() {
         Node node = initialUpdateResponse.brokers().iterator().next();
 
         client.backoff(node, 500);
-        assertEquals(0, sendFetches());
+        assertEquals(1, sendFetches());

Review Comment:
   why are you changing the tests? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to