hachikuji commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r537709868
########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - def responseHandler(response: ClientResponse): Unit = { - try { - val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + + def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { + throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } + } + + class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { + try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) Review comment: nit: drop parenthesis after `brokerEpoch` ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -125,15 +126,24 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { - requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + retryDeadlineMs: Long): Unit = { + requestQueue.put(BrokerToControllerQueueItem(request, callback, retryDeadlineMs)) requestThread.wakeup() } +} + +abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler { Review comment: How about `ControllerRequestCompletionHandler`? ########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - def responseHandler(response: ClientResponse): Unit = { - try { - val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + + def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { + throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } + } + + class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { + try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) + } finally { + clearInflightRequests() } } + + override def onTimeout(): Unit = { + throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller") + } } debug(s"Sending AlterIsr to controller $message") - controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), responseHandler) + // We will not timeout AlterISR request, instead letting it retry indefinitely. Review comment: Perhaps add some more detail: "... letting it retry indefinitely until a response is received or the request is cancelled after receiving new `LeaderAndIsr` state". ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -125,15 +126,24 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { - requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + retryDeadlineMs: Long): Unit = { + requestQueue.put(BrokerToControllerQueueItem(request, callback, retryDeadlineMs)) requestThread.wakeup() } +} + +abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler { + /** + * Fire when the request transmission hits timeout. Review comment: Can we document the difference between this and the request timeout? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -178,6 +190,12 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + // The timeout will only be checked after receiving a response. This means that in the worst case, Review comment: Can we move this comment to the doc for `sendRequest`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org