junrao commented on code in PR #14712:
URL: https://github.com/apache/kafka/pull/14712#discussion_r1387049371


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -54,23 +54,29 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * Creates a wrapped callback to be executed synchronously on the calling 
request thread or asynchronously
+   * on an arbitrary request thread.
+   * NOTE: this function must be originally called from a request thread.
+   * @param asyncCompletionCallback A callback method that we intend to call 
from the current thread or in another
+   *                                thread after an asynchronous action 
completes. The RequestLocal passed in must
+   *                                belong to the request handler thread that 
is executing the callback.
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to execute the callback
+   *                     function synchronously from the calling thread.
+   * @return Wrapped callback will either immediately execute 
`asyncCompletionCallback` or schedule it on an arbitrary request thread
+   *         depending on where it is called
    */
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrapAsyncCallback[T](asyncCompletionCallback: (RequestLocal, T) => Unit, 
requestLocal: RequestLocal): T => Unit = {
     val requestChannel = threadRequestChannel.get()
     val currentRequest = threadCurrentRequest.get()
     if (requestChannel == null || currentRequest == null) {
       if (!bypassThreadCheck)
         throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-      T => fun(T)
+      T => asyncCompletionCallback(requestLocal, T)
     } else {
       T => {
         // The requestChannel and request are captured in this lambda, so when 
it's executed on the callback thread
         // we can re-schedule the original callback on a request thread and 
update the metrics accordingly.
-        requestChannel.sendCallbackRequest(RequestChannel.CallbackRequest(() 
=> fun(T), currentRequest))
+        
requestChannel.sendCallbackRequest(RequestChannel.CallbackRequest(newRequestLocal
 => asyncCompletionCallback(newRequestLocal, T), currentRequest))

Review Comment:
   Thanks for the explanation, Justine. Then could we change the description 
accordingly? "Creates a wrapped callback to be executed synchronously on the 
calling request thread" seems no longer accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to