vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r739166999
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
totalProcessed += processed;
totalRecordsProcessedSinceLastSummary += processed;
+ if (bufferSize > maxBufferSizeBytes && bufferSize -
processedData.totalBytesConsumed <= maxBufferSizeBytes) {
Review comment:
Haha :D
The reason I have this check => `bufferSize > maxBufferSizeBytes` is that
the resume should happen only if after the current round of consumption, the
buffersize which had breached the threshold, now went below. Without that
check, it will always enter the if block- even when it's already lesser(and we
subtract something more and reduce it further). Did that make sense? :D
The idea of placing it here is that, right after removing some records from
the buffer, did the buffer size come down. It similar to how StreamThread
resume used to work(the one I removed). This logic can very well go before
poll but I thought adding it here was more non invasive as there's already some
metrics related stuff and other things happening in this block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]