johnny94 commented on code in PR #19793:
URL: https://github.com/apache/kafka/pull/19793#discussion_r2135206260


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -909,12 +911,33 @@ class ReplicaManager(val config: KafkaConfig,
     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
     initialAppendResults: Map[TopicIdPartition, LogAppendResult],
     initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
-    responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
+    responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
   ): Unit = {
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
       // create delayed produce operation
-      val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
-      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback)
+      val produceMetadata = new ProduceMetadata(requiredAcks, 
initialProduceStatus.asJava)
+
+      // Updates the status of a produce partition based on the current state.
+      // Please refer to the documentation in `DelayedProduce#tryComplete` for
+      // a comprehensive description of Case A, Case B and Case C.
+      def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit 
= {

Review Comment:
   Hi, I fixed based on your advice.
   
   After reviewing the code, I also think keeping state change logic in 
`DelayedProduce` is a better choice. The original comment is a little bit 
misleading after delegating so I updated the comment accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to