Hey Penghui, Could you take a look at this code snippet. int numconsumers = 1; PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5, TimeUnit.SECONDS).listenerThreads(8).build(); final List<Consumer<?>> consumers = new ArrayList<>(); for (int i = 0; i < numconsumers; i++) { consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i)); } System.out.println("Started...");
Runtime.getRuntime().addShutdownHook(new Thread(() -> { for (Consumer<?> consumer : consumers) { try { consumer.close(); } catch (PulsarClientException e) { e.printStackTrace(); } } })); private static Consumer<byte[]> createConsumerWithLister(final PulsarClient client, final String topic, final String subscription, final String consumerName) throws PulsarClientException { return client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription) .subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .messageListener((consumer, msg) -> { System.out.printf("[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n", consumerName, Thread.currentThread().getName(), msg.getKey(), msg.getValue(), msg.getTopicName(), msg.getMessageId().toString()); consumer.acknowledgeAsync(msg); }).subscribe(); } Upon execution, I noticed that all the messages were getting processed by the same listener thread only. This opposes the fact that setting ClientBuilder.listenerThreads(8) and initializing 1 consumer results in all the 8 listener threads being used in processing the incoming message. Could you clarify on this? Is it some configuration mistake? -- Thanks, Hsekar Rian. On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <codelipeng...@gmail.com> wrote: > > Thanks for the quick reply. So, just confirming, if I set up > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer, > then all the 8 threads would be used for processing the incoming messages > in a round-robin fashion. Is my understanding correct? > > Yes. > > > Secondly, I'm trying to understand the producerBuilder.createAsync() & > messageBuilder.sendAsync() concept. I understand that sendAsync() drops > the message into the producer queue. Does a seperate thread pull the > messages from the queue and then send it to the broker? If so, is there a > way to configure the number of these threads? I'm trying to understand how > best to maintain the producer to get maximum publish throughput? > > The sendAsync() will send the message immediately, it will left a pending > callback in the queue. After the broker return the send message to the > client, the client will remove the pending callback. This step is done in > the io thread. > You can change the default IO threads by using clientBuilder.ioThreads(int > numIoThreads). Generally speaking, this does not require a lot of threads, > the general recommendation is your cpu cores. > > Thanks, > Penghui > On Nov 5, 2020, 4:30 PM +0800, users@pulsar.apache.org, wrote: > > > Thanks for the quick reply. So, just confirming, if I set up > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer, > then all the 8 threads would be used for processing the incoming messages > in a round-robin fashion. Is my understanding correct? > >