chia7712 commented on PR #16686:
URL: https://github.com/apache/kafka/pull/16686#issuecomment-2407034568

   > Another point that I would like to mention about ConsumerRebalanceListener 
is that it is very common to actually call the consumer in the listener to 
manually commit offset for instance. As you know, the Consumer is not supposed 
to be called from different threads at the moment.
   
   Okay, that's a solid reason to maintain the current behavior. My main point 
was to simplify the coordination between the foreground and background threads. 
The strict happen-before events don't seem well-suited to the architecture of 
the async consumer.
   
   @kirktrue Could you please consider adding tests or reviewing the existing 
tests to ensure they cover the listener guarantees? for example:
   ```java
       @ClusterTest(brokers = 3)
       @Timeout(60)
       public void testConsumerListener(ClusterInstance clusterInstance) throws 
InterruptedException {
           var threadName = Thread.currentThread().getName() + 
"-testConsumerListener";
           var s = Executors.newSingleThreadExecutor(r -> new Thread(r, 
threadName));
           var onPartitionsAssignedLatch = new CountDownLatch(1);
           var onPartitionsRevokedThread = new AtomicReference<String>();
           var onPartitionsAssignedThread = new AtomicReference<String>();
           var onPartitionsRevokedInterrupted = new AtomicBoolean(false);
           CompletableFuture.runAsync(() -> {
               var consumer = new KafkaConsumer<>(Map.of(
                       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
                       ConsumerConfig.GROUP_ID_CONFIG, "ikea",
                       ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"), new 
ByteArrayDeserializer(), new ByteArrayDeserializer());
               try {
                   consumer.subscribe(List.of("chia"), new 
ConsumerRebalanceListener() {
                       @Override
                       public void 
onPartitionsRevoked(Collection<TopicPartition> partitions) {
                           
onPartitionsRevokedThread.set(Thread.currentThread().getName());
                           
onPartitionsRevokedInterrupted.set(Thread.currentThread().isInterrupted());
                       }
   
                       @Override
                       public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
                           
onPartitionsAssignedThread.set(Thread.currentThread().getName());
                           onPartitionsAssignedLatch.countDown();
   
                       }
                   });
                   IntStream.range(0, 10).forEach(__ -> 
consumer.poll(Duration.ofSeconds(3)));
               } finally {
                   consumer.close(Duration.ofSeconds(0));
               }
           }, s);
           Assertions.assertTrue(onPartitionsAssignedLatch.await(10, 
TimeUnit.SECONDS));
           s.shutdownNow();
           Assertions.assertTrue(s.awaitTermination(10, TimeUnit.SECONDS));
           // start to check all guarantees
           // 1) onPartitionsRevoked should be executed regardless of timeout 
or interruption
           Assertions.assertEquals(threadName, onPartitionsRevokedThread.get());
           // 2) listener should be executed by foreground thread
           Assertions.assertEquals(threadName, 
onPartitionsAssignedThread.get());
           // 3) listener should be able to see interrupted signal
           Assertions.assertTrue(onPartitionsRevokedInterrupted.get());
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to