shashankhs11 commented on code in PR #20292:
URL: https://github.com/apache/kafka/pull/20292#discussion_r2629453021


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -4237,6 +4258,281 @@ t2p1, new PartitionInfo(t2p1.topic(), t2p1.partition(), 
null, new Node[0], new N
         );
     }
 
+    @Test
+    public void 
shouldPauseNonEmptyPartitionsWhenTotalBufferSizeExceedsMaxBufferSize() {
+        // Set up consumer mock
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+
+        // Create records for polling
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
polledRecords = new HashMap<>();
+        final List<ConsumerRecord<byte[], byte[]>> t1p1Records = new 
ArrayList<>();
+
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            0,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            1,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+
+        final List<ConsumerRecord<byte[], byte[]>> t2p1Records = 
Collections.singletonList(
+            new ConsumerRecord<>(
+                t2p1.topic(),
+                t2p1.partition(),
+                0,
+                mockTime.milliseconds(),
+                TimestampType.CREATE_TIME,
+                2,
+                6,
+                new byte[2],
+                new byte[6],
+                new RecordHeaders(),
+                Optional.empty()));
+
+        polledRecords.put(t1p1, t1p1Records);
+        polledRecords.put(t2p1, t2p1Records);
+
+        // Set up consumer behavior
+        final Set<TopicPartition> partitionSet = new 
HashSet<>(Arrays.asList(t1p1, t2p1));

Review Comment:
   addressed in 52ad5fc



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -4237,6 +4258,281 @@ t2p1, new PartitionInfo(t2p1.topic(), t2p1.partition(), 
null, new Node[0], new N
         );
     }
 
+    @Test
+    public void 
shouldPauseNonEmptyPartitionsWhenTotalBufferSizeExceedsMaxBufferSize() {
+        // Set up consumer mock
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+
+        // Create records for polling
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
polledRecords = new HashMap<>();
+        final List<ConsumerRecord<byte[], byte[]>> t1p1Records = new 
ArrayList<>();
+
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            0,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            1,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+
+        final List<ConsumerRecord<byte[], byte[]>> t2p1Records = 
Collections.singletonList(
+            new ConsumerRecord<>(
+                t2p1.topic(),
+                t2p1.partition(),
+                0,
+                mockTime.milliseconds(),
+                TimestampType.CREATE_TIME,
+                2,
+                6,
+                new byte[2],
+                new byte[6],
+                new RecordHeaders(),
+                Optional.empty()));
+
+        polledRecords.put(t1p1, t1p1Records);
+        polledRecords.put(t2p1, t2p1Records);
+
+        // Set up consumer behavior
+        final Set<TopicPartition> partitionSet = new 
HashSet<>(Arrays.asList(t1p1, t2p1));
+
+        // First poll returns records
+        when(consumer.poll(any())).thenReturn(new 
ConsumerRecords<>(polledRecords, Map.of()))
+            // Second and third polls return empty
+            .thenReturn(new ConsumerRecords<>(Map.of(), Map.of()))
+            .thenReturn(new ConsumerRecords<>(Map.of(), Map.of()));
+
+        // Mock paused partitions behavior
+        when(consumer.paused()).thenReturn(partitionSet) // After pause
+            .thenReturn(partitionSet) // Before resume
+            .thenReturn(Collections.emptySet()); // After resume
+
+        // Set up task mock
+        final Task task1 = mock(Task.class);
+        when(task1.inputPartitions()).thenReturn(Set.of(t1p1));
+        when(task1.committedOffsets()).thenReturn(new HashMap<>());
+        when(task1.highWaterMark()).thenReturn(new HashMap<>());
+        when(task1.timeCurrentIdlingStarted()).thenReturn(Optional.empty());
+
+        // Set up TaskManager mock
+        final TaskManager taskManager = mock(TaskManager.class);
+        when(taskManager.activeTaskMap()).thenReturn(mkMap(mkEntry(new 
TaskId(0, 0), task1)));
+        when(taskManager.standbyTaskMap()).thenReturn(new HashMap<>());
+        when(taskManager.producerClientIds()).thenReturn("producerClientId");
+
+        // Mock buffer size behavior
+        when(taskManager.getInputBufferSizeInBytes())
+            .thenReturn(18L)  // After first poll
+            .thenReturn(12L)  // After first process
+            .thenReturn(6L)   // After second process
+            .thenReturn(0L);  // After third process
+
+        when(taskManager.nonEmptyPartitions()).thenReturn(partitionSet);
+        when(taskManager.process(anyInt(), any()))
+            .thenReturn(1)
+            .thenReturn(1)
+            .thenReturn(1)
+            .thenReturn(0);
+
+        // Create configuration and thread
+        final Properties props = configProps(false, false, false);
+        final StreamsConfig config = new StreamsConfig(props);
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+
+        thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            null,
+            streamsMetrics,
+            topologyMetadata,
+            PROCESS_ID,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
+            null,
+            HANDLER,
+            null,
+            Optional.empty(),
+            null,
+            10L // maxBufferSize set to 10
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        thread.setState(State.STARTING);
+        thread.setState(State.PARTITIONS_ASSIGNED);
+        thread.setState(State.RUNNING);
+
+        // Run the test
+        thread.runOnceWithoutProcessingThreads();
+        thread.runOnceWithoutProcessingThreads();
+        thread.runOnceWithoutProcessingThreads();
+
+        // Verify behavior
+        verify(consumer).pause(partitionSet);
+        verify(consumer).resume(partitionSet);
+        verify(taskManager, times(1)).addRecordsToTasks(any());
+    }
+
+    @Test
+    public void shouldNotPausePartitionsWhenMaxBufferSizeIsSetToNegative() {
+        // Set up consumer mock
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+
+        // Create records for polling
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
polledRecords = new HashMap<>();
+        final List<ConsumerRecord<byte[], byte[]>> t1p1Records = new 
ArrayList<>();
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            0,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+        t1p1Records.add(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            1,
+            mockTime.milliseconds(),
+            TimestampType.CREATE_TIME,
+            2,
+            6,
+            new byte[2],
+            new byte[6],
+            new RecordHeaders(),
+            Optional.empty()));
+        final List<ConsumerRecord<byte[], byte[]>> t2p1Records = 
Collections.singletonList(

Review Comment:
   addressed in 52ad5fc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to