hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378179666
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
}
/**
- * Wrap callback to schedule it on a request thread.
- * NOTE: this function must be called on a request thread.
- * @param fun Callback function to execute
- * @return Wrapped callback that would execute `fun` on a request thread
+ * A callback method that expected to be executed once in an arbitrary
request handler thread after an asynchronous action completes.
Review Comment:
nit: **is** expected?
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
}
/**
- * Wrap callback to schedule it on a request thread.
- * NOTE: this function must be called on a request thread.
- * @param fun Callback function to execute
- * @return Wrapped callback that would execute `fun` on a request thread
+ * A callback method that expected to be executed once in an arbitrary
request handler thread after an asynchronous action completes.
+ * The RequestLocal passed in must belong to the request handler thread that
is executing the callback.
*/
- def wrap[T](fun: T => Unit): T => Unit = {
+ class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
Review Comment:
nit: could probably be case class?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /*
+ * Note: This method can be used as a callback in a different request
thread. Ensure that correct RequestLocal
+ * is passed when executing this method. Accessing non-thread-safe data
structures should be avoided if possible.
+ */
+ private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+ internalTopicsAllowed: Boolean,
+ origin: AppendOrigin,
+ requiredAcks: Short,
+ verificationGuards: Map[TopicPartition,
VerificationGuard],
+ errorsPerPartition: Map[TopicPartition, Errors],
+ sTime: Long,
+ recordConversionStatsCallback: Map[TopicPartition,
RecordConversionStats] => Unit,
+ timeout: Long,
+ responseCallback: Map[TopicPartition,
PartitionResponse] => Unit,
+ delayedProduceLock: Option[Lock])
+ (requestLocal: RequestLocal)
Review Comment:
nit: probably simpler to return `(request: RequestLocal, unverifiedEntries:
Map[TopicPartition, Errors])`. Then we don't need the awkward `(_)(_)` in the
call above
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
}
/**
- * Wrap callback to schedule it on a request thread.
- * NOTE: this function must be called on a request thread.
- * @param fun Callback function to execute
- * @return Wrapped callback that would execute `fun` on a request thread
+ * A callback method that expected to be executed once in an arbitrary
request handler thread after an asynchronous action completes.
+ * The RequestLocal passed in must belong to the request handler thread that
is executing the callback.
*/
- def wrap[T](fun: T => Unit): T => Unit = {
+ class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
+
+ /**
+ * Wrap callback to schedule it on an abitrary request thread.
Review Comment:
nit: typo arbitrary
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /*
+ * Note: This method can be used as a callback in a different request
thread. Ensure that correct RequestLocal
+ * is passed when executing this method. Accessing non-thread-safe data
structures should be avoided if possible.
+ */
+ private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+ internalTopicsAllowed: Boolean,
+ origin: AppendOrigin,
+ requiredAcks: Short,
+ verificationGuards: Map[TopicPartition,
VerificationGuard],
Review Comment:
Hmm, without the lock, an offset commit may be reordered after a group state
change. To make it safe, we would need to reacquire the group lock after
validation completes so that we could check again whether the request should be
accepted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]