apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1816619673
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
Review Comment:
So the update should be safe with multiple threads as we have acquired the
lock on share partition which guards us from 2 threads trying to update the
offset metadata. But we should write comments on the share partition update
method that the caller of the method should ensure that share partition fetch
lock is acquired prior invoking the updateLatestFetchOffsetMetadata.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
Review Comment:
Can it be in a separate method i.e. divide methods.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
+
+ if (endOffsetMetadata ==
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ return true;
Review Comment:
Why do we satisfy the min byte criteria if share partition is null?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
Review Comment:
So this iteration will always be excuted for every share fetch when the
`missingFetchOffsetMetadataTopicPartitions` will rarely be true, only when a
new SharePartition is created. Hence I was thinking why not to have such update
only on SharePartition initilization. Though I understand that current
`readFromLog` API requires fethchParams but is there an API which can supply
the LogOffsetMetadata when requested with topic partition and specific
offset(start offset of share partition)? @junrao wdyt?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -517,20 +525,22 @@ public long nextFetchOffset() {
* @param maxFetchRecords The maximum number of records that should be
acquired, this is a soft
* limit and the method might acquire more
records than the maxFetchRecords,
* if the records are already part of the same
fetch batch.
- * @param fetchPartitionData The fetched records for the share partition.
+ * @param fetchPartitionOffsetData The fetched records for the share
partition along with log offset metadata
Review Comment:
Please correct the alignment of comments.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,25 +109,11 @@ public void onComplete() {
topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ Map<TopicIdPartition, FetchPartitionOffsetData> responseData;
+ if (logReadResponseFromTryComplete == null ||
logReadResponseFromTryComplete.isEmpty())
Review Comment:
Can it be non-null and empty ever i.e. do you require your second condition?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -261,6 +263,11 @@ public static RecordState forId(byte id) {
*/
private long endOffset;
+ /**
+ * We maintain the latest fetch offset metadata to estimate the minBytes
requirement more efficiently.
+ */
+ private Optional<LogOffsetMetadata> latestFetchOffsetMetadata;
Review Comment:
Why it's optional?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
+
+ if (endOffsetMetadata ==
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ return true;
+ }
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.latestFetchOffsetMetadata();
+ if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+ LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
+
+ if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} since it is fetching later segments of " +
+ "topicIdPartition {}", shareFetchData.groupId(),
shareFetchData.memberId(), topicIdPartition);
+ return true;
+ } else if (fetchOffsetMetadata.messageOffset <
endOffsetMetadata.messageOffset) {
+ if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata))
{
+ // This can happen when the fetch operation is falling
behind the current segment or the partition
+ // has just rolled a new segment.
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} immediately since it is fetching older " +
+ "segments of topicIdPartition {}",
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+ return true;
+ } else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+ // we take the partition fetch size as upper bound
when accumulating the bytes.
+ long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
partitionData.maxBytes);
+ accumulatedSize += bytesAvailable;
+ }
+ }
+ }
+ return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+ } catch (Exception e) {
+ // Ideally we should complete the share fetch request's future
exceptionally in this case from tryComplete itself.
+ // A function that can be utilized is handleFetchException in an
in-flight PR https://github.com/apache/kafka/pull/16842.
+ // Perhaps, once the mentioned PR is merged, I'll change it to
better exception handling.
+ log.error("Error processing the minBytes criteria for share fetch
request", e);
+ return true;
+ }
+ }
+
+ private Map<TopicIdPartition, FetchPartitionOffsetData>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ Map<TopicIdPartition, FetchPartitionOffsetData> responseData = new
HashMap<>();
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, new
FetchPartitionOffsetData(fetchPartitionData,
logResult.info().fetchOffsetMetadata));
Review Comment:
So if we are fetch from offset 0 and gets response from log for 0-1000
offsets then `logResult.info().fetchOffsetMetadata` contains information for 0
offset or 1000th offset i.e. which offset metadata does it hold?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
+
+ if (endOffsetMetadata ==
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ return true;
+ }
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.latestFetchOffsetMetadata();
Review Comment:
Should the varibale in SharePartition be Optional? We always should have
that, correct?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +145,28 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty())
- return forceComplete();
- log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
- shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
+ if (!topicPartitionData.isEmpty()) {
+ MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataResult =
+
maybeUpdateFetchOffsetMetadataForTopicPartitions(topicPartitionData);
Review Comment:
Merge the lines.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
+
+ if (endOffsetMetadata ==
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ return true;
+ }
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.latestFetchOffsetMetadata();
+ if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+ LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
+
+ if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} since it is fetching later segments of " +
+ "topicIdPartition {}", shareFetchData.groupId(),
shareFetchData.memberId(), topicIdPartition);
+ return true;
+ } else if (fetchOffsetMetadata.messageOffset <
endOffsetMetadata.messageOffset) {
+ if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata))
{
+ // This can happen when the fetch operation is falling
behind the current segment or the partition
+ // has just rolled a new segment.
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} immediately since it is fetching older " +
+ "segments of topicIdPartition {}",
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+ return true;
+ } else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+ // we take the partition fetch size as upper bound
when accumulating the bytes.
+ long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
partitionData.maxBytes);
+ accumulatedSize += bytesAvailable;
+ }
+ }
+ }
+ return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+ } catch (Exception e) {
+ // Ideally we should complete the share fetch request's future
exceptionally in this case from tryComplete itself.
+ // A function that can be utilized is handleFetchException in an
in-flight PR https://github.com/apache/kafka/pull/16842.
+ // Perhaps, once the mentioned PR is merged, I'll change it to
better exception handling.
+ log.error("Error processing the minBytes criteria for share fetch
request", e);
+ return true;
+ }
+ }
+
+ private Map<TopicIdPartition, FetchPartitionOffsetData>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ Map<TopicIdPartition, FetchPartitionOffsetData> responseData = new
HashMap<>();
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, new
FetchPartitionOffsetData(fetchPartitionData,
logResult.info().fetchOffsetMetadata));
+ return BoxedUnit.UNIT;
+ });
+
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ return responseData;
+ }
+
+ // Visible for testing.
+ Map<TopicIdPartition, FetchPartitionOffsetData>
combineLogReadResponseForOnComplete(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ if (!logReadResponseFromTryComplete.containsKey(topicIdPartition))
{
+ missingLogReadTopicPartitions.put(topicIdPartition,
partitionData);
+ }
+ });
Review Comment:
Again this will be rare hence shall we delay initiliazing LinkedHashMap.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
+
+ if (endOffsetMetadata ==
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ return true;
+ }
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.latestFetchOffsetMetadata();
+ if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+ LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
+
+ if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} since it is fetching later segments of " +
+ "topicIdPartition {}", shareFetchData.groupId(),
shareFetchData.memberId(), topicIdPartition);
+ return true;
Review Comment:
Sorry, I didn't understand when we can have the offset in share partition >
partition end offset?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +211,154 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private MaybeUpdateFetchOffsetMetadataResult
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ boolean isFetchOffsetMetadataUpdated = false;
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata().isEmpty())
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return new MaybeUpdateFetchOffsetMetadataResult(false, null);
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
missingFetchOffsetMetadataTopicPartitions.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.debug("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not contain
topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ isFetchOffsetMetadataUpdated = true;
+ }
+ return new
MaybeUpdateFetchOffsetMetadataResult(isFetchOffsetMetadataUpdated,
replicaManagerReadResponseData);
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ try {
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry
: topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it other FetchIsolation types.
+ FetchIsolation isolationType =
shareFetchData.fetchParams().isolation;
+ LogOffsetMetadata endOffsetMetadata;
+ if (isolationType == FetchIsolation.LOG_END)
+ endOffsetMetadata = offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ endOffsetMetadata = offsetSnapshot.highWatermark;
+ else
+ endOffsetMetadata = offsetSnapshot.lastStableOffset;
Review Comment:
Can it go in a method please.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,25 +109,11 @@ public void onComplete() {
topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ Map<TopicIdPartition, FetchPartitionOffsetData> responseData;
+ if (logReadResponseFromTryComplete == null ||
logReadResponseFromTryComplete.isEmpty())
+ responseData = readFromLog(topicPartitionData);
+ else
+ responseData =
combineLogReadResponseForOnComplete(topicPartitionData);
Review Comment:
Can it ever occur that you have non-empty `logReadResponseFromTryComplete`
but `topicPartitionData` came from fresh aquisition from line 98
(topicPartitionData = acquirablePartitions();). I think never, can you just
write comments for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]