Hi Hsekar,

Sorry, I did not describe clearly. The client will get a thread from the 
listener threads following round-robin pattern and then assign this thread to a 
consumer for handling the incoming messages.
So for a consumer all incoming messages are handling by single thread and if 
the consumers count are greater than the listener threads count, there will be 
the same thread assigned to different consumers.

Thanks,
Penghui
On Nov 5, 2020, 9:07 PM +0800, Rakesh Nair <rakesh.n...@6dtech.co.in>, wrote:
> Hey Penghui,
> Could you take a look at this code snippet.
>
> int numconsumers = 1;
>    PulsarClient client = 
> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5, 
> TimeUnit.SECONDS).listenerThreads(8).build();
>    final List<Consumer<?>> consumers = new ArrayList<>();
>    for (int i = 0; i < numconsumers; i++) {
>        consumers.add(createConsumerWithLister(client, "my-topic", 
> "my-subscription", "C" + i));
>    }
>    System.out.println("Started...");
>
>    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>        for (Consumer<?> consumer : consumers) {
>            try {
>                consumer.close();
>            } catch (PulsarClientException e) {
>                e.printStackTrace();
>            }
>        }
>    }));
> private static Consumer<byte[]> createConsumerWithLister(final PulsarClient 
> client, final String topic, final String subscription, final String 
> consumerName) throws PulsarClientException {
>        return 
> client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription)
>                
> .subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared)
>                
> .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
>                .messageListener((consumer, msg) -> {
>                    System.out.printf("[%s/%s]Message received: key=%s, 
> value=%s, topic=%s, id=%s%n", consumerName,
>                            Thread.currentThread().getName(), msg.getKey(), 
> msg.getValue(), msg.getTopicName(),
>                            msg.getMessageId().toString());
>                    consumer.acknowledgeAsync(msg);
>                }).subscribe();
>    }
> Upon execution, I noticed that all the messages were getting processed by the 
> same listener thread only. This opposes the fact that setting  
> ClientBuilder.listenerThreads(8) and initializing 1 consumer results in all 
> the 8 listener threads being used in processing the incoming message.
> Could you clarify on this? Is it some configuration mistake?
>
> --
> Thanks,
> Hsekar Rian.
>
> > On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <codelipeng...@gmail.com> wrote:
> > > > Thanks for the quick reply. So, just confirming, if I set up 
> > > > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 
> > > > producer, then all the 8 threads would be used for processing the 
> > > > incoming messages in a round-robin fashion. Is my understanding correct?
> > >
> > > Yes.
> > >
> > > > Secondly, I'm trying to understand the producerBuilder.createAsync() & 
> > > > messageBuilder.sendAsync() concept. I understand that  sendAsync() 
> > > > drops the message into the producer queue. Does a seperate thread pull 
> > > > the messages from the queue and then send it to the broker? If so, is 
> > > > there a way to configure the number of these threads? I'm trying to 
> > > > understand how best to maintain the producer to get maximum publish 
> > > > throughput?
> > >
> > > The sendAsync() will send the message immediately, it will left a pending 
> > > callback in the queue. After the broker return the send message to the 
> > > client, the client will remove the pending callback. This step is done in 
> > > the io thread.
> > > You can change the default IO threads by using 
> > > clientBuilder.ioThreads(int numIoThreads). Generally speaking, this does 
> > > not require a lot of threads, the general recommendation is your cpu 
> > > cores.
> > >
> > > Thanks,
> > > Penghui
> > > On Nov 5, 2020, 4:30 PM +0800, users@pulsar.apache.org, wrote:
> > > >
> > > > Thanks for the quick reply. So, just confirming, if I set up 
> > > > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 
> > > > producer, then all the 8 threads would be used for processing the 
> > > > incoming messages in a round-robin fashion. Is my understanding correct?
>

Reply via email to