apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1806669871
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,6 +187,58 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ AtomicInteger accumulatedBytes = new AtomicInteger(0);
+
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, fetchPartitionData);
+
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
+ return BoxedUnit.UNIT;
+ });
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+
+ if (accumulatedBytes.get() >=
shareFetchData.fetchParams().minBytes)
+ replicaManagerFetchSatisfyingMinBytes = responseData;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
+ } finally {
+ // The case when we cannot satisfy the share fetch requests
because the response has lesser data than minBytes
+ // and the call is coming from tryComplete, hence we want to
release partitions lock so that the next
+ // tryComplete/onComplete call complete successfully.
+ if (replicaManagerFetchSatisfyingMinBytes.isEmpty() &&
!hasRequestTimedOut) {
+ // Releasing the lock to move ahead with the next request in
queue.
+ releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ }
+ }
+ // This call is coming from onComplete, hence we return the response
data irrespective of whether minBytes is
+ // satisfied or not.
+ if (hasRequestTimedOut)
+ return responseData;
Review Comment:
Hmmm, is this same for regular fetch operations as well?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,6 +187,58 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ AtomicInteger accumulatedBytes = new AtomicInteger(0);
+
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, fetchPartitionData);
+
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
+ return BoxedUnit.UNIT;
+ });
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+
+ if (accumulatedBytes.get() >=
shareFetchData.fetchParams().minBytes)
+ replicaManagerFetchSatisfyingMinBytes = responseData;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
+ } finally {
+ // The case when we cannot satisfy the share fetch requests
because the response has lesser data than minBytes
+ // and the call is coming from tryComplete, hence we want to
release partitions lock so that the next
+ // tryComplete/onComplete call complete successfully.
+ if (replicaManagerFetchSatisfyingMinBytes.isEmpty() &&
!hasRequestTimedOut) {
+ // Releasing the lock to move ahead with the next request in
queue.
+ releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ }
Review Comment:
And why don't we want to release the partition locks from onComplete?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = topicPartitionDataFromTryComplete;
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
+ Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+ // tryComplete did not invoke forceComplete, so we need to get replica
manager response data.
+ if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = acquirablePartitions();
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we
complete the request with an empty response.
+ shareFetchData.future().complete(Collections.emptyMap());
+ return;
+ }
+ fetchResponseData = replicaManagerFetchData(topicPartitionData,
true);
+ } else {
+ // tryComplete invoked forceComplete, so we can use the replica
manager response data from tryComplete.
+ fetchResponseData = replicaManagerFetchDataFromTryComplete;
}
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
fetchResponseData, sharePartitionManager, replicaManager);
Review Comment:
fetchResponseData can still be empty, though processFetchResponse handles
the empty check, is it intended? Though no harm, just checking with you.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,6 +187,58 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
Review Comment:
In most scenarios the request might have minBytes, hence do you always want
to initialize a hash map? Mostly it will be overriden with `responseData` map.
So can't it be null? Moreover can't it be simpy a boolean variable i.e.
boolean minBytesSatisfied = false
if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
replicaManagerFetchSatisfyingMinBytes = responseData;
=>
if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
minBytesSatisfied = true;
if (replicaManagerFetchSatisfyingMinBytes.isEmpty() && !hasRequestTimedOut)
{
=>
if (!minBytesSatisfied && !hasRequestTimedOut) {
return replicaManagerFetchSatisfyingMinBytes;
=>
return Collections.emptyMap()
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,10 +133,13 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty())
- return forceComplete();
+ if (!topicPartitionData.isEmpty()) {
+ replicaManagerFetchDataFromTryComplete =
replicaManagerFetchData(topicPartitionData, false);
+ if (!replicaManagerFetchDataFromTryComplete.isEmpty())
+ return forceComplete();
+ }
log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
Review Comment:
Now we can come to this code path getting no result from
`replicaManagerFetchData`. Hence is the log line still correct?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = topicPartitionDataFromTryComplete;
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
+ Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
Review Comment:
Do you need this extra variable or can just write later, if needed? And then
no need of else block below.
replicaManagerFetchDataFromTryComplete =
replicaManagerFetchData(topicPartitionData, true);
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = topicPartitionDataFromTryComplete;
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
+ Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+ // tryComplete did not invoke forceComplete, so we need to get replica
manager response data.
+ if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = acquirablePartitions();
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we
complete the request with an empty response.
+ shareFetchData.future().complete(Collections.emptyMap());
+ return;
+ }
+ fetchResponseData = replicaManagerFetchData(topicPartitionData,
true);
+ } else {
+ // tryComplete invoked forceComplete, so we can use the replica
manager response data from tryComplete.
+ fetchResponseData = replicaManagerFetchDataFromTryComplete;
}
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
fetchResponseData, sharePartitionManager, replicaManager);
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
+ releasePartitionLocks(shareFetchData.groupId(),
fetchResponseData.keySet());
Review Comment:
What about if partitions were locked but no response in data aarived then
will the lock be correctly released?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]