Hey Penghui,
Then what might be the best approach to attain maximum throughput when
consuming numerous messages - maintaining multiple consumers with shared
subscription? But wouldn't this create unnecessary broker communication
overhead?

I'm open to other suggestions too for attaining maximum throughput..

-- 
Thanks,
Hsekar Rian.

On Thu, Nov 5, 2020 at 7:20 PM PengHui Li <codelipeng...@gmail.com> wrote:

> Hi Hsekar,
>
> Sorry, I did not describe clearly. The client will get a thread from the
> listener threads following round-robin pattern and then assign this thread
> to a consumer for handling the incoming messages.
> So for a consumer all incoming messages are handling by single thread and
> if the consumers count are greater than the listener threads count, there
> will be the same thread assigned to different consumers.
>
> Thanks,
> Penghui
> On Nov 5, 2020, 9:07 PM +0800, Rakesh Nair <rakesh.n...@6dtech.co.in>,
> wrote:
>
> Hey Penghui,
> Could you take a look at this code snippet.
>
> int numconsumers = 1;
>     PulsarClient client = 
> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5, 
> TimeUnit.SECONDS).listenerThreads(8).build();
>     final List<Consumer<?>> consumers = new ArrayList<>();
>     for (int i = 0; i < numconsumers; i++) {
>         consumers.add(createConsumerWithLister(client, "my-topic", 
> "my-subscription", "C" + i));
>     }
>     System.out.println("Started...");
>
>     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>         for (Consumer<?> consumer : consumers) {
>             try {
>                 consumer.close();
>             } catch (PulsarClientException e) {
>                 e.printStackTrace();
>             }
>         }
>     }));
>
> private static Consumer<byte[]> createConsumerWithLister(final PulsarClient 
> client, final String topic, final String subscription, final String 
> consumerName) throws PulsarClientException {
>         return 
> client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription)
>                 
> .subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared)
>                 
> .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
>                 .messageListener((consumer, msg) -> {
>                     System.out.printf("[%s/%s]Message received: key=%s, 
> value=%s, topic=%s, id=%s%n", consumerName,
>                             Thread.currentThread().getName(), msg.getKey(), 
> msg.getValue(), msg.getTopicName(),
>                             msg.getMessageId().toString());
>                     consumer.acknowledgeAsync(msg);
>                 }).subscribe();
>     }
>
> Upon execution, I noticed that all the messages were getting processed by
> the same listener thread only. This opposes the fact that setting  
> ClientBuilder.listenerThreads(8)
> and initializing 1 consumer results in all the 8 listener threads being used
> in processing the incoming message.
> Could you clarify on this? Is it some configuration mistake?
>
> --
> Thanks,
> Hsekar Rian.
>
> On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <codelipeng...@gmail.com> wrote:
>
>> > Thanks for the quick reply. So, just confirming, if I set up
>> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer,
>> then all the 8 threads would be used for processing the incoming messages
>> in a round-robin fashion. Is my understanding correct?
>>
>> Yes.
>>
>> > Secondly, I'm trying to understand the producerBuilder.createAsync() &
>> messageBuilder.sendAsync() concept. I understand that  sendAsync() drops
>> the message into the producer queue. Does a seperate thread pull the
>> messages from the queue and then send it to the broker? If so, is there a
>> way to configure the number of these threads? I'm trying to understand how
>> best to maintain the producer to get maximum publish throughput?
>>
>> The sendAsync() will send the message immediately, it will left a pending
>> callback in the queue. After the broker return the send message to the
>> client, the client will remove the pending callback. This step is done in
>> the io thread.
>> You can change the default IO threads by using
>> clientBuilder.ioThreads(int numIoThreads). Generally speaking, this does
>> not require a lot of threads, the general recommendation is your cpu cores.
>>
>> Thanks,
>> Penghui
>> On Nov 5, 2020, 4:30 PM +0800, users@pulsar.apache.org, wrote:
>>
>>
>> Thanks for the quick reply. So, just confirming, if I set up
>> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer,
>> then all the 8 threads would be used for processing the incoming messages
>> in a round-robin fashion. Is my understanding correct?
>>
>>
>

Reply via email to