rajinisivaram commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2890461322
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -2193,7 +2188,6 @@ private void subscribeInternal(Collection<String> topics,
Optional<ConsumerRebal
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor
will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
- *
Review Comment:
nit: Unnecessary formatting changes.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -163,7 +162,6 @@
* {@link ConsumerNetworkThread network thread}. Visit
* <a
href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design">this
document</a>
* for implementation detail.
- *
Review Comment:
nit: unnecessary formatting change
##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -629,6 +643,7 @@ private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
+ ensureBootstrapped(now);
Review Comment:
The naming of `ensureBootstrapped` is odd. It looks like the method attempts
to resolve once. And if that fails and bootstrap timer hasn't expired, we
return here without bootstrapping and fall through. And that would just fail in
unexpected ways. We should probably retry resolution here up to the poll
timeout or bootstrap timeout, whichever expires first. And 1) return without an
error if bootstrap timer hasn't expired 2) throw BootstrapResolutionException
if bootstrap timer expired before poll timeout 3) Fall through only if
resolution has succeeded.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1983,7 +1979,6 @@ private boolean isCommittedOffsetsManagementEnabled() {
* This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the
pre-fetch case, the application thread
* will not wait for confirmation of the request creation before
continuing.
- *
Review Comment:
nit: unnecessary formatting changes in this file.
##########
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java:
##########
@@ -91,26 +91,31 @@
import static org.mockito.Mockito.when;
public class NetworkClientTest {
+ protected static final MockTime TIME = new MockTime();
Review Comment:
Why was this changed to static? The old instance variable `time` seems to be
sufficient?
##########
clients/src/main/java/org/apache/kafka/clients/KafkaClient.java:
##########
@@ -212,5 +212,4 @@ ClientRequest newClientRequest(String nodeId,
* was invoked for this client.
*/
boolean active();
-
Review Comment:
nit: unnecessary formatting change in a file with no other changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]