adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828820632
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ // In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
+ // those topic partitions.
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse =
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsToComplete = topicPartitionData;
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsToComplete.keySet());
+ partitionsAlreadyFetched.clear();
+ partitionsToComplete.clear();
+ }
+ return completedByMe;
+ } else {
+ log.debug("minBytes is not satisfied for the share fetch
request for group {}, member {}, " +
+ "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
+ releasePartitionLocks(topicPartitionData.keySet());
+ }
+ } else {
+ log.trace("Can't acquire records for any partition in the
share fetch request for group {}, member {}, " +
+ "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that
means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionDataFromTryComplete.keySet());
+ releasePartitionLocks(partitionsToComplete.keySet());
Review Comment:
Hi @junrao, you're right, I've changed the line to
releasePartitionLocks(topicPartitionData.keySet())
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ // In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
+ // those topic partitions.
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse =
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsToComplete = topicPartitionData;
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsToComplete.keySet());
+ partitionsAlreadyFetched.clear();
+ partitionsToComplete.clear();
+ }
+ return completedByMe;
+ } else {
+ log.debug("minBytes is not satisfied for the share fetch
request for group {}, member {}, " +
+ "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
+ releasePartitionLocks(topicPartitionData.keySet());
+ }
+ } else {
+ log.trace("Can't acquire records for any partition in the
share fetch request for group {}, member {}, " +
+ "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that
means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionDataFromTryComplete.keySet());
+ releasePartitionLocks(partitionsToComplete.keySet());
Review Comment:
Hi @junrao, you're right, I've changed the line to
`releasePartitionLocks(topicPartitionData.keySet())`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]