chia7712 commented on PR #16686: URL: https://github.com/apache/kafka/pull/16686#issuecomment-2407034568
> Another point that I would like to mention about ConsumerRebalanceListener is that it is very common to actually call the consumer in the listener to manually commit offset for instance. As you know, the Consumer is not supposed to be called from different threads at the moment. Okay, that's a solid reason to maintain the current behavior. My main point was to simplify the coordination between the foreground and background threads. The strict happen-before events don't seem well-suited to the architecture of the async consumer. @kirktrue Could you please consider adding tests or reviewing the existing tests to ensure they cover the listener guarantees? for example: ```java @ClusterTest(brokers = 3) @Timeout(60) public void testConsumerListener(ClusterInstance clusterInstance) throws InterruptedException { var threadName = Thread.currentThread().getName() + "-testConsumerListener"; var s = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName)); var onPartitionsAssignedLatch = new CountDownLatch(1); var onPartitionsRevokedThread = new AtomicReference<String>(); var onPartitionsAssignedThread = new AtomicReference<String>(); var onPartitionsRevokedInterrupted = new AtomicBoolean(false); CompletableFuture.runAsync(() -> { var consumer = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "ikea", ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"), new ByteArrayDeserializer(), new ByteArrayDeserializer()); try { consumer.subscribe(List.of("chia"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { onPartitionsRevokedThread.set(Thread.currentThread().getName()); onPartitionsRevokedInterrupted.set(Thread.currentThread().isInterrupted()); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { onPartitionsAssignedThread.set(Thread.currentThread().getName()); onPartitionsAssignedLatch.countDown(); } }); IntStream.range(0, 10).forEach(__ -> consumer.poll(Duration.ofSeconds(3))); } finally { consumer.close(Duration.ofSeconds(0)); } }, s); Assertions.assertTrue(onPartitionsAssignedLatch.await(10, TimeUnit.SECONDS)); s.shutdownNow(); Assertions.assertTrue(s.awaitTermination(10, TimeUnit.SECONDS)); // start to check all guarantees // 1) onPartitionsRevoked should be executed regardless of timeout or interruption Assertions.assertEquals(threadName, onPartitionsRevokedThread.get()); // 2) listener should be executed by foreground thread Assertions.assertEquals(threadName, onPartitionsAssignedThread.get()); // 3) listener should be able to see interrupted signal Assertions.assertTrue(onPartitionsRevokedInterrupted.get()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org