Hey Penghui,
Could you take a look at this code snippet.

int numconsumers = 1;
    PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5,
TimeUnit.SECONDS).listenerThreads(8).build();
    final List<Consumer<?>> consumers = new ArrayList<>();
    for (int i = 0; i < numconsumers; i++) {
        consumers.add(createConsumerWithLister(client, "my-topic",
"my-subscription", "C" + i));
    }
    System.out.println("Started...");

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        for (Consumer<?> consumer : consumers) {
            try {
                consumer.close();
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
        }
    }));

private static Consumer<byte[]> createConsumerWithLister(final
PulsarClient client, final String topic, final String subscription,
final String consumerName) throws PulsarClientException {
        return 
client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription)

.subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared)

.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .messageListener((consumer, msg) -> {
                    System.out.printf("[%s/%s]Message received:
key=%s, value=%s, topic=%s, id=%s%n", consumerName,
                            Thread.currentThread().getName(),
msg.getKey(), msg.getValue(), msg.getTopicName(),
                            msg.getMessageId().toString());
                    consumer.acknowledgeAsync(msg);
                }).subscribe();
    }

Upon execution, I noticed that all the messages were getting processed by
the same listener thread only. This opposes the fact that setting
ClientBuilder.listenerThreads(8)
and initializing 1 consumer results in all the 8 listener threads being
used in processing the incoming message.
Could you clarify on this? Is it some configuration mistake?

-- 
Thanks,
Hsekar Rian.

On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <codelipeng...@gmail.com> wrote:

> > Thanks for the quick reply. So, just confirming, if I set up
> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer,
> then all the 8 threads would be used for processing the incoming messages
> in a round-robin fashion. Is my understanding correct?
>
> Yes.
>
> > Secondly, I'm trying to understand the producerBuilder.createAsync() &
> messageBuilder.sendAsync() concept. I understand that  sendAsync() drops
> the message into the producer queue. Does a seperate thread pull the
> messages from the queue and then send it to the broker? If so, is there a
> way to configure the number of these threads? I'm trying to understand how
> best to maintain the producer to get maximum publish throughput?
>
> The sendAsync() will send the message immediately, it will left a pending
> callback in the queue. After the broker return the send message to the
> client, the client will remove the pending callback. This step is done in
> the io thread.
> You can change the default IO threads by using clientBuilder.ioThreads(int
> numIoThreads). Generally speaking, this does not require a lot of threads,
> the general recommendation is your cpu cores.
>
> Thanks,
> Penghui
> On Nov 5, 2020, 4:30 PM +0800, users@pulsar.apache.org, wrote:
>
>
> Thanks for the quick reply. So, just confirming, if I set up
> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer,
> then all the 8 threads would be used for processing the incoming messages
> in a round-robin fashion. Is my understanding correct?
>
>

Reply via email to