hachikuji commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r427432140
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may + * produce deadlock if caller already holds a group lock. Hence, caller should pass + * false to disable completion and then complete the delayed requests after releasing + * held group lock + */ + def appendRecordsToLeader(records: MemoryRecords, + origin: AppendOrigin, + requiredAcks: Int, + completeDelayedRequests: Boolean): LogAppendResult = { Review comment: Sorry, let me be clearer: 1. Mainly I'm suggesting moving the delayed operation checking into `ReplicaManager` instead of `Partition`. 2. We can change the call to `appendRecordsToLeader` in `GroupMetadataManager` to go through `ReplicaManager` as well. 3. We could make the callback optional in `ReplicaManager.appendRecords` so that we do not have to add a callback (which appears to be the only reason we write directly to `Partition` from `GroupMetadataManager`). Anyway, just a suggestion. I thought it might let us keep the completion logic encapsulated in `ReplicaManager`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org