Hey Penghui, Then what might be the best approach to attain maximum throughput when consuming numerous messages - maintaining multiple consumers with shared subscription? But wouldn't this create unnecessary broker communication overhead?
I'm open to other suggestions too for attaining maximum throughput.. -- Thanks, Hsekar Rian. On Thu, Nov 5, 2020 at 7:20 PM PengHui Li <codelipeng...@gmail.com> wrote: > Hi Hsekar, > > Sorry, I did not describe clearly. The client will get a thread from the > listener threads following round-robin pattern and then assign this thread > to a consumer for handling the incoming messages. > So for a consumer all incoming messages are handling by single thread and > if the consumers count are greater than the listener threads count, there > will be the same thread assigned to different consumers. > > Thanks, > Penghui > On Nov 5, 2020, 9:07 PM +0800, Rakesh Nair <rakesh.n...@6dtech.co.in>, > wrote: > > Hey Penghui, > Could you take a look at this code snippet. > > int numconsumers = 1; > PulsarClient client = > PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5, > TimeUnit.SECONDS).listenerThreads(8).build(); > final List<Consumer<?>> consumers = new ArrayList<>(); > for (int i = 0; i < numconsumers; i++) { > consumers.add(createConsumerWithLister(client, "my-topic", > "my-subscription", "C" + i)); > } > System.out.println("Started..."); > > Runtime.getRuntime().addShutdownHook(new Thread(() -> { > for (Consumer<?> consumer : consumers) { > try { > consumer.close(); > } catch (PulsarClientException e) { > e.printStackTrace(); > } > } > })); > > private static Consumer<byte[]> createConsumerWithLister(final PulsarClient > client, final String topic, final String subscription, final String > consumerName) throws PulsarClientException { > return > client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription) > > .subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared) > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) > .messageListener((consumer, msg) -> { > System.out.printf("[%s/%s]Message received: key=%s, > value=%s, topic=%s, id=%s%n", consumerName, > Thread.currentThread().getName(), msg.getKey(), > msg.getValue(), msg.getTopicName(), > msg.getMessageId().toString()); > consumer.acknowledgeAsync(msg); > }).subscribe(); > } > > Upon execution, I noticed that all the messages were getting processed by > the same listener thread only. This opposes the fact that setting > ClientBuilder.listenerThreads(8) > and initializing 1 consumer results in all the 8 listener threads being used > in processing the incoming message. > Could you clarify on this? Is it some configuration mistake? > > -- > Thanks, > Hsekar Rian. > > On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <codelipeng...@gmail.com> wrote: > >> > Thanks for the quick reply. So, just confirming, if I set up >> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer, >> then all the 8 threads would be used for processing the incoming messages >> in a round-robin fashion. Is my understanding correct? >> >> Yes. >> >> > Secondly, I'm trying to understand the producerBuilder.createAsync() & >> messageBuilder.sendAsync() concept. I understand that sendAsync() drops >> the message into the producer queue. Does a seperate thread pull the >> messages from the queue and then send it to the broker? If so, is there a >> way to configure the number of these threads? I'm trying to understand how >> best to maintain the producer to get maximum publish throughput? >> >> The sendAsync() will send the message immediately, it will left a pending >> callback in the queue. After the broker return the send message to the >> client, the client will remove the pending callback. This step is done in >> the io thread. >> You can change the default IO threads by using >> clientBuilder.ioThreads(int numIoThreads). Generally speaking, this does >> not require a lot of threads, the general recommendation is your cpu cores. >> >> Thanks, >> Penghui >> On Nov 5, 2020, 4:30 PM +0800, users@pulsar.apache.org, wrote: >> >> >> Thanks for the quick reply. So, just confirming, if I set up >> ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 producer, >> then all the 8 threads would be used for processing the incoming messages >> in a round-robin fashion. Is my understanding correct? >> >> >