muralibasani commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3351382559


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1476,6 +1512,17 @@ private long pollPhase() {
             taskManager.updateNextOffsets(records.nextOffsets());
         }
 
+        // soft cap — read isn't atomic across tasks, so small overshoots are 
fine.
+        final long bufferSize = taskManager.getInputBufferSizeInBytes();
+        if (maxBufferSizeInBytes.get() != UNDEFINED_INPUT_BUFFER_MAX_BYTES && 
bufferSize > maxBufferSizeInBytes.get()) {
+            // pause only non-empty partitions; pausing empty ones risks 
ordering deadlock (KAFKA-13152).
+            final Set<TopicPartition> nonEmptyPartitions = 
taskManager.nonEmptyPartitions();
+            log.info("Buffered records size {} bytes exceeds {}. Pausing 
partitions {} from the consumer",
+                bufferSize, maxBufferSizeInBytes.get(), nonEmptyPartitions);
+            mainConsumer.pause(nonEmptyPartitions);
+            partitionsPausedForBufferOverflow.addAll(nonEmptyPartitions);

Review Comment:
   Since partitionsPausedForBufferOverflow is a Set, and pollPhase is invoked 
for every iteration, duplicates are not added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to