apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1816657075
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
Review Comment:
So this iteration will always be executed for every share fetch when the
`missingFetchOffsetMetadataTopicPartitions` will rarely be true, only when a
new SharePartition is created. Hence, I was thinking why not to have such
update only on SharePartition initialization. Though I understand that current
`readFromLog` API requires fethchParams but is there an API which can supply
the LogOffsetMetadata when requested with topic partition and specific
offset(start offset of share partition)? @junrao wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]