[ https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Wang resolved KAFKA-6974. ------------------------------- Resolution: Won't Fix > Changes the interaction between request handler threads and fetcher threads > into an ASYNC model > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-6974 > URL: https://issues.apache.org/jira/browse/KAFKA-6974 > Project: Kafka > Issue Type: Improvement > Reporter: Lucas Wang > Priority: Minor > > Problem Statement: > At LinkedIn, occasionally our clients complain about receiving consant > NotLeaderForPartition exceptions > Investigations: > For one investigated case, the cluster was going through a rolling bounce. > And we saw there was a ~8 minutes delay between an old partition leader > resigning and the new leader becoming active, based on entries of "Broker xxx > handling LeaderAndIsr request" in the state change log. > Our monitoring shows the LeaderAndISR request local time during the incident > went up to ~4 minutes. > Explanations: > One possible explanation of the ~8 minutes of delay is: > During controlled shutdown of a broker, the partitions whose leaders lie on > the shutting down broker need to go through leadership transitions. And the > controller process partitions in batches with each batch having > config.controlledShutdownPartitionBatchSize partitions, e.g. 100. > If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 > minutes, then the subsequent LeaderAndISR requests can have an accumulated > delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that > subsequent LeaderAndISR requests are blocked in a muted channel, given only > one LeaderAndISR request can be processed at a time with a > maxInFlightRequestsPerConnection setting of 1. When that happens, no existing > metric would show the total delay of 8 or 12 minutes for muted requests. > Now the question is why it took ~4 minutes for the the 1st LeaderAndISR > request to finish. > Explanation for the ~4 minutes of local time for LeaderAndISR request: > During processing of an LeaderAndISR request, the request handler thread > needs to add partitions to or remove partitions from partitionStates field of > the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the > size of the partitionStates field. On the other hand, background fetcher > threads need to iterate through all the partitions in partitionStates in > order to build fetch request, and process fetch responses. The > synchronization between request handler thread and the fetcher threads is > done through a partitionMapLock. > Specifically, the fetcher threads may acquire the partitionMapLock, and then > calls the following functions for processing the fetch response > (1) processPartitionData, which in turn calls > (2) Replica.maybeIncrementLogStartOffset, which calls > (3) Log.maybeIncrementLogStartOffset, which calls > (4) LeaderEpochCache.clearAndFlushEarliest. > Now two factors contribute to the long holding of the partitionMapLock, > 1. function (4) above entails calling sync() to make sure data gets > persistent to the disk, which may potentially have a long latency > 2. All the 4 functions above can potentially be called for each partition in > the fetch response, multiplying the sync() latency by a factor of n. > The end result is that the request handler thread got blocked for a long time > trying to acquire the partitionMapLock of some fetcher inside > AbstractFetcherManager.shutdownIdleFetcherThreads since checking each > fetcher's partitionCount requires getting the partitionMapLock. > In our testing environment, we reproduced the problem and confirmed the > explanation above with a request handler thread getting blocked for 10 > seconds trying to acquire the partitionMapLock of one particular fetcher > thread, while there are many log entries showing "Incrementing log start > offset of partition..." > Proposed change: > We propose to change the interaction between the request handler threads and > the fetcher threads to an ASYNC model by using an event queue. All requests > to add or remove partitions, or shutdown idle fetcher threads are modeled as > items in the event queue. And only the fetcher threads can take items out of > the event queue and actually process them. > In the new ASYNC model, in order to be able to process an infinite sequence > of FetchRequests, a fetcher thread initially has one FetchRequest, and after > it's done processing one FetchRequest, it enqueues one more into its own > event queue. > Also since the current AbstractFetcherThread logic is inherited by both the > replica-fetcher-threads and the consumer-fetcher-threads for the old > consumer, and the latter has been deprecated, we plan to implement the ASYNC > model with a clean-slate approach, and only support the > replica-fetcher-threads, in order to make the code cleaner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)