rajinisivaram commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2890461322


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -2193,7 +2188,6 @@ private void subscribeInternal(Collection<String> topics, 
Optional<ConsumerRebal
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
-     *

Review Comment:
   nit: Unnecessary formatting changes.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -163,7 +162,6 @@
  * {@link ConsumerNetworkThread network thread}. Visit
  * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design";>this
 document</a>
  * for implementation detail.
- *

Review Comment:
   nit: unnecessary formatting change



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -629,6 +643,7 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
         ensureActive();
+        ensureBootstrapped(now);

Review Comment:
   The naming of `ensureBootstrapped` is odd. It looks like the method attempts 
to resolve once. And if that fails and bootstrap timer hasn't expired, we 
return here without bootstrapping and fall through. And that would just fail in 
unexpected ways. We should probably retry resolution here up to the poll 
timeout or bootstrap timeout, whichever expires first. And 1) return without an 
error if bootstrap timer hasn't expired 2) throw BootstrapResolutionException 
if bootstrap timer expired before poll timeout 3) Fall through only if  
resolution has succeeded.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1983,7 +1979,6 @@ private boolean isCommittedOffsetsManagementEnabled() {
      * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests} for the
      * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the 
pre-fetch case, the application thread
      * will not wait for confirmation of the request creation before 
continuing.
-     *

Review Comment:
   nit: unnecessary formatting changes in this file.



##########
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java:
##########
@@ -91,26 +91,31 @@
 import static org.mockito.Mockito.when;
 
 public class NetworkClientTest {
+    protected static final MockTime TIME = new MockTime();

Review Comment:
   Why was this changed to static? The old instance variable `time` seems to be 
sufficient?



##########
clients/src/main/java/org/apache/kafka/clients/KafkaClient.java:
##########
@@ -212,5 +212,4 @@ ClientRequest newClientRequest(String nodeId,
      * was invoked for this client.
      */
     boolean active();
-

Review Comment:
   nit: unnecessary formatting change in a file with no other changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to