hachikuji commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r427432140



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may
+   *                                produce deadlock if caller already holds a 
group lock. Hence, caller should pass
+   *                                false to disable completion and then 
complete the delayed requests after releasing
+   *                                held group lock
+   */
+  def appendRecordsToLeader(records: MemoryRecords,
+                            origin: AppendOrigin,
+                            requiredAcks: Int,
+                            completeDelayedRequests: Boolean): LogAppendResult 
= {

Review comment:
       Sorry, let me be clearer:
   
   1. Mainly I'm suggesting moving the delayed operation checking into 
`ReplicaManager` instead of `Partition`.
   2. We can change the call to `appendRecordsToLeader` in 
`GroupMetadataManager` to go through `ReplicaManager` as well.
   3. We could make the callback optional in `ReplicaManager.appendRecords` so 
that we do not have to add a callback (which appears to be the only reason we 
write directly to `Partition` from `GroupMetadataManager`).
   
   Anyway, just a suggestion. I thought it might let us keep the completion 
logic encapsulated in `ReplicaManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to