apoorvmittal10 commented on code in PR #17583:
URL: https://github.com/apache/kafka/pull/17583#discussion_r1817079818
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -149,15 +149,27 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- log.trace("Try to complete the delayed share fetch request for group
{}, member {}, topic partitions {}",
- shareFetchData.groupId(), shareFetchData.memberId(),
- shareFetchData.partitionMaxBytes().keySet());
+ // There can be multiple threads which might invoke tryComplete for
same share fetch request
+ // hence check if delay share fetch request is already completed. If
yes, return true.
+ // However, this check alone cannot guarantee that request is really
completed. It is possible that
+ // tryComplete is invoked by multiple threads and state has yet not
updated. Hence, we need to check
+ // the forceComplete response as well.
+ if (isCompleted()) {
Review Comment:
So I rechecked and added log line to see the tryComplete is being called
even when `completed` is true.
Here is my understanding, I can see in DelayedOperation.scala:
1. tryComplete() is always executed safely i.e. from `safeTryComplete` or
`safeTryCompleteOrElse` which takes a lock on DelayedOperation itself hence no
2 threads can execute `tryComplete()` simultaneously. Correct?
2. You are right that `tryCompleteWatched` has `completed` check already.
But the issue does exist.
This triggers only when there are multiple share consumers for same group
and same topic partition. I have traced the calls and can find following: the
calls originates from `addToActionQueue` defined in `onCompleted` of
DelayedShareFetch. Though the request goes through `tryCompletedWatch` but then
again the `tryComplete` is called despite completed. The conditional variable
in DelayedOperation etc. seems fine to me. Not sure how it triggers.
```
[2024-10-25 17:34:32,670] INFO Share fetch request for group SG1, member
hYDnbPjATHqM7uFXBGYKTw is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,703] INFO Share fetch request for group SG1, member
hYDnbPjATHqM7uFXBGYKTw is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,754] INFO Share fetch request for group SG1, member
hYDnbPjATHqM7uFXBGYKTw is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,191] INFO Share fetch request for group SG1, member
hYDnbPjATHqM7uFXBGYKTw is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,391] INFO Share fetch request for group SG1, member
OE4al-DNR6C8u3tiD05rGQ is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,682] INFO Share fetch request for group SG1, member
OE4al-DNR6C8u3tiD05rGQ is already completed
(kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:34,363] INFO Share fetch request for group SG1, member
OE4al-DNR6C8u3tiD05rGQ is already completed
(kafka.server.share.DelayedShareFetch)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]