chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440585314
########## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ########## @@ -311,37 +317,47 @@ class GroupMetadataManager(brokerId: Int, responseCallback(responseError) } - appendForGroup(group, groupMetadataRecords, putCacheCallback) - + appendForGroup(group, groupMetadataRecords, putCacheCallback, completeDelayedRequests) case None => responseCallback(Errors.NOT_COORDINATOR) - None + Map.empty } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete + * delayed requests for those returned partitions. + */ private def appendForGroup(group: GroupMetadata, records: Map[TopicPartition, MemoryRecords], - callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = { + callback: Map[TopicPartition, PartitionResponse] => Unit, + completeDelayedRequests: Boolean): Map[TopicPartition, LeaderHWChange] = { // call replica manager to append the group message replicaManager.appendRecords( timeout = config.offsetCommitTimeoutMs.toLong, requiredAcks = config.offsetCommitRequiredAcks, internalTopicsAllowed = true, origin = AppendOrigin.Coordinator, + completeDelayedRequests = completeDelayedRequests, entriesPerPartition = records, delayedProduceLock = Some(group.lock), responseCallback = callback) } /** * Store offsets by appending it to the replicated log and then inserting to cache + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete + * delayed requests for those returned partitions. */ def storeOffsets(group: GroupMetadata, consumerId: String, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit, + completeDelayedRequests: Boolean, Review comment: nice caching. Most methods don't need this flag. Let me revert them :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org