lianetm commented on code in PR #19980:
URL: https://github.com/apache/kafka/pull/19980#discussion_r2316339959


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java:
##########
@@ -1588,6 +1590,75 @@ private void sendCompressedMessages(int numRecords, 
TopicPartition tp) {
         }
     }
 
+    @ClusterTest
+    public void testClassicConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CONSUMER);
+    }
+
+    /**
+     * This test is to prove that the intermittent stalling that has been 
experienced when using the asynchronous
+     * consumer, as filed under KAFKA-19259, have been fixed.
+     *
+     * <p/>
+     *
+     * The basic idea is to have one thread that produces a record every 500 
ms. and the main thread that consumes
+     * records without pausing between polls for much more than the produce 
delay. In the test case filed in
+     * KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds 
despite records being produced every second.
+     */
+    private void testStallBetweenPoll(GroupProtocol groupProtocol) throws 
Exception {
+        var testTopic = "stall-test-topic";
+        var numPartitions = 6;
+        cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
+
+        // The producer must produce slowly to tickle the scenario.
+        var produceDelay = 500;
+
+        var executor = Executors.newScheduledThreadPool(1);
+
+        try (var producer = cluster.producer()) {
+            // Start a thread running that produces records at a relative 
trickle.
+            executor.scheduleWithFixedDelay(
+                () -> producer.send(new ProducerRecord<>(testTopic, 
TestUtils.randomBytes(64))),
+                0,
+                produceDelay,
+                TimeUnit.MILLISECONDS
+            );
+
+            Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT));
+
+            // Assign a tolerance for how much time is allowed to pass between 
Consumer.poll() calls given that there
+            // should be *at least* one record to read every second.
+            var pollDelayTolerance = 2000;
+
+            try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+                consumer.subscribe(List.of(testTopic));
+
+                // This is here to allow the consumer time to settle the group 
membership/assignment.
+                awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 
0));
+
+                // Keep track of the last time the poll is invoked to ensure 
the deltas between invocations don't
+                // exceed the delay threshold defined above.
+                var beforePoll = System.currentTimeMillis();
+                consumer.poll(Duration.ofSeconds(5));
+                consumer.poll(Duration.ofSeconds(5));
+                var afterPoll = System.currentTimeMillis();
+                var pollDelay = afterPoll - beforePoll;
+
+                if (pollDelay > pollDelayTolerance)
+                    fail("Detected a stall of " + pollDelay + " ms between 
Consumer.poll() invocations despite a Producer producing records every " + 
produceDelay + " ms");

Review Comment:
   Adding a `consumer.unsubscribe()` before completing the test should help 
solve the flakiness @AndrewJSchofield reported.
   
   I tried locally and got it too (around 30 iterations), and noticed it fails 
on the first poll to join the group and consume. This seemed to solve the 
flakiness on my side, up to 100 iterations, but please double check in case 
there is something else.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -98,7 +98,8 @@ void add(CompletedFetch completedFetch) {
         try {
             lock.lock();
             completedFetches.add(completedFetch);
-            notEmptyCondition.signalAll();
+            wokenup.set(true);
+            blockingCondition.signalAll();

Review Comment:
   could we reuse the existing `addAll` in this implementation of `add`? Would 
avoid dup logic like this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to