guozhangwang commented on a change in pull request #9671: URL: https://github.com/apache/kafka/pull/9671#discussion_r534377676
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { @Override public void onFailure(RuntimeException e, RequestFuture<Void> future) { - clearFindCoordinatorFuture(); + log.debug("FindCoordinator request failed", e); Review comment: nit: I think it's better to just print the e.message in a single line. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -235,11 +235,6 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { return true; do { - if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) { Review comment: The main reason for https://github.com/apache/kafka/pull/7312/files#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R230-R234 is the following: * inside ensureCoordinatorReady called by the main thread, we may break out of the loop at line 248 below, without knowing what's the final state of the future. * and that future could be completed by the other thread (hb) later, and replaced by a new `future` object. In that case, when the main thread calls ensureCoordinatorReady again, it will "miss" the previous future's contained fatal error. So thinking about it again, I think we would still want to maintain the exception but only if it is a fatal one inside the handler (i.e. we do not probably need to register another listener just to bookkeep that exception, but just piggy-back this logic inside the handler listener directly), and then inside the while loop, we check if a previous future already gets a fatal exception and if yes, throw it to fail the whole client. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { break; } + RuntimeException fatalException = null; + if (future.failed()) { if (future.isRetriable()) { log.debug("Coordinator discovery failed, refreshing metadata", future.exception()); client.awaitMetadataUpdate(timer); - } else - throw future.exception(); + } else { + log.info("FindCoordinator request hit fatal exception", fatalException); + fatalException = future.exception(); + } } else if (coordinator != null && client.isUnavailable(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown(); timer.sleep(rebalanceConfig.retryBackoffMs); } + + clearFindCoordinatorFuture(); Review comment: I think this is a better approach, but we need to be careful about the callee inside hb thread: ``` if (findCoordinatorFuture != null || lookupCoordinator().failed()) ``` i.e. a hb thread sending a discover-coordinator request would also cause a future to be assigned, but that future would only be cleared by the main thread caller. Thinking about that for a sec I think this is okay, but maybe worth having a second pair of eyes over it. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { break; } + RuntimeException fatalException = null; + if (future.failed()) { if (future.isRetriable()) { log.debug("Coordinator discovery failed, refreshing metadata", future.exception()); client.awaitMetadataUpdate(timer); - } else - throw future.exception(); + } else { + log.info("FindCoordinator request hit fatal exception", fatalException); Review comment: nit: extra space. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org