junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2658498977
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -354,6 +363,39 @@ private boolean
shouldHandleAuthorizationError(RuntimeException exception) {
return false;
}
+ private void failExpiredBatches(List<ProducerBatch> expiredBatches, long
now, boolean deallocateNow) {
+ // Reset the producer id if an expired batch has previously been sent
to the broker. Also update the metrics
+ // for expired batches. see the documentation of
@TransactionState.resetIdempotentProducerId to understand why
+ // we need to reset the producer id here.
+ if (!expiredBatches.isEmpty())
+ log.trace("Expired {} batches in accumulator",
expiredBatches.size());
+ for (ProducerBatch expiredBatch : expiredBatches) {
+ String errorMessage = "Expiring " + expiredBatch.recordCount + "
record(s) for " + expiredBatch.topicPartition
+ + ":" + (now - expiredBatch.createdMs) + " ms has passed since
batch creation";
+ failBatch(expiredBatch, new TimeoutException(errorMessage), false,
deallocateNow);
+ if (transactionManager != null && expiredBatch.inRetry()) {
+ // This ensures that no new batches are drained until the
current in flight batches are fully resolved.
+ transactionManager.markSequenceUnresolved(expiredBatch);
+ }
+ }
+ }
+
+ private void deallocateRequestTimeoutBatches(long now) {
+ Iterator<ProducerBatch> iter =
expiredInflightBatchesAwaitingDeallocation.iterator();
+ while (iter.hasNext()) {
+ ProducerBatch batch = iter.next();
+ if (batch.hasReachedTimeout(requestTimeoutMs, now)) {
Review Comment:
Each inflight batch has a pending RPC request in NetworkClient. It's
probably better to deallocate the buffer when the RPC request completes through
either the failBatch() or completeBatch() call. I am thinking that we can add
an AtomicBoolean deallocated in ProducerBatch. Add a new param deallocateBuffer
to failBatch(). When calling failBatch() on the inflight batches, we set
deallocateBuffer to false. In all other calls to failBatch(), we set
deallocateBuffer to true. In completeBatch() and failBatch() (if
deallocateBuffer is true), we deallocate the buffer if deallocated is false and
then mark deallocated as true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]