kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1640115221
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer
timer) {
return true;
log.debug("Refreshing committed offsets for partitions {}",
initializingPartitions);
+
+ // The shorter the timeout provided to poll(), the more likely the
offsets fetch will time out. To handle
+ // this case, on the first attempt to fetch the committed offsets, a
FetchCommittedOffsetsEvent is created
+ // (with potentially a longer timeout) and stored. The event is used
for the first attempt, but in the
+ // case it times out, subsequent attempts will also use the event in
order to wait for the results.
+ if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+ // Give the event a reasonable amount of time to complete.
+ final long timeoutMs = Math.max(defaultApiTimeoutMs,
timer.remainingMs());
+ final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
+ pendingOffsetFetchEvent = new
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+ applicationEventHandler.add(pendingOffsetFetchEvent);
+ }
+
+ final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future
= pendingOffsetFetchEvent.future();
+ boolean shouldClearPendingEvent = false;
+
try {
- final FetchCommittedOffsetsEvent event =
- new FetchCommittedOffsetsEvent(
- initializingPartitions,
- calculateDeadlineMs(timer));
- wakeupTrigger.setActiveTask(event.future());
- final Map<TopicPartition, OffsetAndMetadata> offsets =
applicationEventHandler.addAndGet(event);
+ wakeupTrigger.setActiveTask(future);
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
ConsumerUtils.getResult(future, timer);
+
+ // Clear the pending event once its result is successfully
retrieved.
+ shouldClearPendingEvent = true;
+
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;
} catch (TimeoutException e) {
log.error("Couldn't refresh committed offsets before timeout
expired");
Review Comment:
Changed to debug and updated the text of the log to be slightly more helpful.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer
timer) {
return true;
log.debug("Refreshing committed offsets for partitions {}",
initializingPartitions);
+
+ // The shorter the timeout provided to poll(), the more likely the
offsets fetch will time out. To handle
+ // this case, on the first attempt to fetch the committed offsets, a
FetchCommittedOffsetsEvent is created
+ // (with potentially a longer timeout) and stored. The event is used
for the first attempt, but in the
+ // case it times out, subsequent attempts will also use the event in
order to wait for the results.
+ if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+ // Give the event a reasonable amount of time to complete.
+ final long timeoutMs = Math.max(defaultApiTimeoutMs,
timer.remainingMs());
+ final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
+ pendingOffsetFetchEvent = new
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+ applicationEventHandler.add(pendingOffsetFetchEvent);
+ }
+
+ final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future
= pendingOffsetFetchEvent.future();
+ boolean shouldClearPendingEvent = false;
+
try {
- final FetchCommittedOffsetsEvent event =
- new FetchCommittedOffsetsEvent(
- initializingPartitions,
- calculateDeadlineMs(timer));
- wakeupTrigger.setActiveTask(event.future());
- final Map<TopicPartition, OffsetAndMetadata> offsets =
applicationEventHandler.addAndGet(event);
+ wakeupTrigger.setActiveTask(future);
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
ConsumerUtils.getResult(future, timer);
+
+ // Clear the pending event once its result is successfully
retrieved.
+ shouldClearPendingEvent = true;
+
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;
} catch (TimeoutException e) {
log.error("Couldn't refresh committed offsets before timeout
expired");
return false;
+ } catch (InterruptException e) {
+ throw e;
+ } catch (Throwable t) {
+ // Clear the pending event on errors that are not timeout- or
interrupt-related.
Review Comment:
Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]