junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1813657687
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -89,25 +91,45 @@ public void onComplete() {
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
- // tryComplete did not invoke forceComplete, so we need to get replica
manager response data.
- if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+ // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
+ if (topicPartitionDataFromTryComplete.isEmpty())
topicPartitionData = acquirablePartitions();
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we
complete the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
- }
- fetchResponseData = replicaManagerFetchData(topicPartitionData,
true);
- } else {
- // tryComplete invoked forceComplete, so we can use the topic
partitions data and replica manager response data from tryComplete.
+ // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
+ else
topicPartitionData = topicPartitionDataFromTryComplete;
- fetchResponseData = replicaManagerFetchDataFromTryComplete;
+
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we complete
the request with an empty response.
+ shareFetchData.future().complete(Collections.emptyMap());
+ return;
}
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
+ topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ Map<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadata = new
HashMap<>();
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, fetchPartitionData);
+ fetchOffsetMetadata.put(topicIdPartition,
logResult.info().fetchOffsetMetadata);
+ return BoxedUnit.UNIT;
+ });
+
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
fetchResponseData, sharePartitionManager, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, fetchOffsetMetadata, sharePartitionManager, replicaManager);
Review Comment:
It's kind of late to set the offset metadata here since we will acquire the
fetch records and move the next fetch offset. This means the cached offset
metadata doesn't match the next fetch offset.
I was thinking of doing the following. In `tryComplete`, if the offset
metadata doesn't exist, we call `replicaManager.readFromLog` to populate the
offset metadata. We can then proceed with the minByte estimation. If
SharePartition.endOffset moves, we invalidate the offset metadata.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -197,58 +219,25 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- /**
- * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
- */
- Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
- boolean
hasRequestTimedOut) {
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
- shareFetchData.groupId(), shareFetchData.fetchParams());
- boolean minBytesSatisfied = false;
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- AtomicLong accumulatedBytes = new AtomicLong(0);
-
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
-
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
- return BoxedUnit.UNIT;
- });
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
-
- if (accumulatedBytes.get() >=
shareFetchData.fetchParams().minBytes)
- minBytesSatisfied = true;
- } catch (Exception e) {
- log.error("Error processing delayed share fetch request", e);
- } finally {
- // The case when we cannot satisfy the share fetch requests
because the response has lesser data than minBytes
- // and the call is coming from tryComplete, hence we want to
release partitions lock so that the next
- // tryComplete/onComplete call complete successfully.
- if (!minBytesSatisfied && !hasRequestTimedOut) {
- // Releasing the lock to move ahead with the next request in
queue.
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ AtomicLong accumulatedSize = new AtomicLong(0);
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ LogOffsetMetadata endOffsetMetadata = offsetSnapshot.highWatermark;
Review Comment:
We need to check `shareFetchData.fetchParams().isolation` to decide whether
to use HWM or lastStableOffset.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -197,58 +219,25 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- /**
- * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
- */
- Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
- boolean
hasRequestTimedOut) {
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
- shareFetchData.groupId(), shareFetchData.fetchParams());
- boolean minBytesSatisfied = false;
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- AtomicLong accumulatedBytes = new AtomicLong(0);
-
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
-
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
- return BoxedUnit.UNIT;
- });
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
-
- if (accumulatedBytes.get() >=
shareFetchData.fetchParams().minBytes)
- minBytesSatisfied = true;
- } catch (Exception e) {
- log.error("Error processing delayed share fetch request", e);
- } finally {
- // The case when we cannot satisfy the share fetch requests
because the response has lesser data than minBytes
- // and the call is coming from tryComplete, hence we want to
release partitions lock so that the next
- // tryComplete/onComplete call complete successfully.
- if (!minBytesSatisfied && !hasRequestTimedOut) {
- // Releasing the lock to move ahead with the next request in
queue.
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ AtomicLong accumulatedSize = new AtomicLong(0);
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ LogOffsetMetadata endOffsetMetadata = offsetSnapshot.highWatermark;
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ return;
}
- }
- // We can return the replica manager fetch response for the following
2 cases -
- // 1. This call is coming from onComplete, hence we return the
response data irrespective of whether minBytes is
- // satisfied or not.
- // 2. Return the response if minBytes criteria is satisfied (request
is coming from tryComplete).
- if (hasRequestTimedOut || minBytesSatisfied)
- return responseData;
- // Return an empty map if replica manager fetch does not satisfy
minBytes (request is coming from tryComplete).
- return Collections.emptyMap();
+
+ // we take the partition fetch size as upper bound when
accumulating the bytes (skip if a throttled partition)
+ long bytesAvailable = sharePartition.latestFetchOffsetMetadata()
== null ?
+ Math.min(endOffsetMetadata.relativePositionInSegment,
partitionData.maxBytes) :
+
Math.min(endOffsetMetadata.positionDiff(sharePartition.latestFetchOffsetMetadata()),
partitionData.maxBytes);
Review Comment:
We need to check if the two offset metadata are on the same segment first
before using `positionDiff`. Also, we need to handle the case when HWM doesn't
have offset metadata. We can just follow the logic in `DelayedFetch`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]