apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1809360360
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +191,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ boolean minBytesSatisfied = false;
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ AtomicInteger accumulatedBytes = new AtomicInteger(0);
Review Comment:
Should it be AtomicLong?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +133,20 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty())
- return forceComplete();
- log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
- shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
+ if (!topicPartitionData.isEmpty()) {
+ replicaManagerFetchDataFromTryComplete =
replicaManagerFetchData(topicPartitionData, false);
+ if (!replicaManagerFetchDataFromTryComplete.isEmpty())
+ return forceComplete();
+ log.info("minBytes is not satisfied for the share fetch request
for group {}, member {}, " +
+ "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
+ } else {
+ log.info("Can't acquire records for any partition in the share
fetch request for group {}, member {}, " +
Review Comment:
Wouldn't this be too common? Should we move it trace?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +133,20 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty())
- return forceComplete();
- log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
- shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
+ if (!topicPartitionData.isEmpty()) {
+ replicaManagerFetchDataFromTryComplete =
replicaManagerFetchData(topicPartitionData, false);
+ if (!replicaManagerFetchDataFromTryComplete.isEmpty())
+ return forceComplete();
+ log.info("minBytes is not satisfied for the share fetch request
for group {}, member {}, " +
Review Comment:
info seems to be too much here, how helpful this log would be in info mode?
Can we move it to debug please.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = topicPartitionDataFromTryComplete;
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
+ Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+ // tryComplete did not invoke forceComplete, so we need to get replica
manager response data.
+ if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = acquirablePartitions();
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we
complete the request with an empty response.
+ shareFetchData.future().complete(Collections.emptyMap());
+ return;
+ }
+ fetchResponseData = replicaManagerFetchData(topicPartitionData,
true);
+ } else {
+ // tryComplete invoked forceComplete, so we can use the replica
manager response data from tryComplete.
+ fetchResponseData = replicaManagerFetchDataFromTryComplete;
}
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
fetchResponseData, sharePartitionManager, replicaManager);
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ releasePartitionLocks(shareFetchData.groupId(),
fetchResponseData.keySet());
Review Comment:
Will it not depend on `readFromLog` API response i.e. if you sent 3
partitions then is it guaranteed that replica manager will return all 3
partitions in response?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]