vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r739166999



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
+                    if (bufferSize > maxBufferSizeBytes && bufferSize - 
processedData.totalBytesConsumed <= maxBufferSizeBytes) {

Review comment:
       Haha :D 
   The reason I have this check => `bufferSize > maxBufferSizeBytes` is that 
the resume should happen only if after the current round of consumption, the 
buffersize which had breached the threshold, now went below. Without that 
check, it will always enter the if block- even when it's already lesser(and we 
subtract something more and reduce it further). Did that make sense? :D 
   The idea of placing it here is that, right after removing some records from 
the buffer, did the buffer size come down. It similar to how StreamThread 
resume used to  work(the one I removed). This logic can very well go before 
poll but I thought adding it here was more non invasive as there's already some 
metrics related stuff and other things happening in this block. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to