hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378179666


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   nit: **is** expected?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
    */
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   nit: could probably be case class?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+                            internalTopicsAllowed: Boolean,
+                            origin: AppendOrigin,
+                            requiredAcks: Short,
+                            verificationGuards: Map[TopicPartition, 
VerificationGuard],
+                            errorsPerPartition: Map[TopicPartition, Errors],
+                            sTime: Long,
+                            recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit,
+                            timeout: Long,
+                            responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+                            delayedProduceLock: Option[Lock])
+                           (requestLocal: RequestLocal)

Review Comment:
   nit: probably simpler to return `(request: RequestLocal, unverifiedEntries: 
Map[TopicPartition, Errors])`. Then we don't need the awkward `(_)(_)` in the 
call above



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
    */
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
+
+  /**
+   * Wrap callback to schedule it on an abitrary request thread.

Review Comment:
   nit: typo arbitrary



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+                            internalTopicsAllowed: Boolean,
+                            origin: AppendOrigin,
+                            requiredAcks: Short,
+                            verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Hmm, without the lock, an offset commit may be reordered after a group state 
change. To make it safe, we would need to reacquire the group lock after 
validation completes so that we could check again whether the request should be 
accepted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to