[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans resolved KAFKA-15946. ------------------------------------ Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: Lianet Magrans (was: Kirk True) Resolution: Fixed 3.7 includes fix to make sure that only sync commits are retried, with a timeout, and async commits are not (just passing failure to the callback). There is also a follow ticket https://issues.apache.org/jira/browse/KAFKA-16033 > AsyncKafkaConsumer should retry commits on the application thread instead of > auto-retry > --------------------------------------------------------------------------------------- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Philip Nee > Assignee: Lianet Magrans > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.7.0 > > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)