Hello, I have an application that for every message it receives, it processes it and writes a message to a different topic, finally acknowledges the message received.
On shutdown, I would like for the subscriber threads to terminate processing the messages they already read. I am closing all the Consumers first and then I close the Pulsar client. for (Consumer<?> consumer : consumers) { consumer.close(); } pulsarClient.close() But, there are consumer threads that have not yet finished. So, they fail when trying to write to the message (because the PulsarClient is already closing) or when trying to ack the original message (because the Consumer has been closed) Is there a way to wait on the consumer threads to complete (of course, with a timeout)? The threads are named like: pulsar-external-listener-23-1 pulsar-external-listener-25-1 That thread pool is configured in the PulsarClient (listenerThreads). Is there a way to get a handle to this pool ? For now, I plan on just adding a Thread.sleep() in the middle after pausing the consumers: for (Consumer<?> consumer : consumers) { consumer.pause(); } Thread.sleep(timeout); for (Consumer<?> consumer : consumers) { consumer.close(); } pulsarClient.close() at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:945) at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$7(ConsumerBase.java:924) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = Closing at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:975) at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:91) org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer already closed ... at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:945) at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$7(ConsumerBase.java:924) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer already closed at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:318) at org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:306) ... 9 more Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer already closed at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:313) ... 11 more Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer already closed at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doAcknowledge(MultiTopicsConsumerImpl.java:437) at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:554) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:494) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:483) ... 12 more