Hi Arnaud,

Yes, you are correct, 'concurrentConsumers; uri attribute is not working as
expected and this is some bug.

However I found when you use delay in the route it works as expected.

from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
waitTimeSeconds=20")
.delay(1L)
                .process(exchange -> {
                    System.out.println("Message received...");
                    })
                .process(exchange -> {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }});

try it and let me know.

On Mon, Jan 31, 2022 at 11:48 PM Arnaud Level <arn...@toro-intl.com> wrote:

> Hi Larry,
>
> Thank you for your message! Your reply makes sense to me and I've tried
> what you suggested to test it with a queue like:
>
> from("aws2-sqs://aramark-notifications?maxMessagesPerPoll=1&concurrentConsumers=5&amazonSQSClient=#sqsClient")
> but it does not change, I still see the messages with a delay of 5 seconds
> between each of them.
>
> If I turn on the trace with
> (logging.level.org.apache.camel.component.aws2.sqs=TRACE). I see that the
> next polling is only triggered after
> the Delete message of a consumed message is sent. And in the logs I see "1
> message received" but no polling will occur until the Delete message is
> sent which occurs only after the sleep of 5 secs. Only 1 message is read
> per polling and I should see at least 5 of them since
> concurrentConsumers=5.
> It looks to me there is still only 1 concurrent consumer and I don't
> understand why.
>
> Arnaud
>
> On Mon, Jan 31, 2022 at 3:27 PM Larry Shields <larry.shie...@gmail.com>
> wrote:
>
> > Hi Arnaud,
> >
> > I think what may be happening is that you first consumer is grabbing all
> of
> > the messages from the queue that are available.  The default message poll
> > size is unlimited so your other 4 consumers are polling for nothing on
> the
> > queue.  What you might want to try is setting the maxMessagesPerPoll to a
> > value.  This will reduce the number of messages a consumer retrieves for
> > each poll.
> >
> > The other thing I have done is set Greedy=true when using concurrent
> > consumers if you're looking to increase throughput.  This will cause the
> > consumer to make another poll immediately without waiting for the next
> > delay if the previous poll returned at least 1 message.
> >
> > Example:
> >
> >
> aws-sqs://my-queue?greedy=true&maxMessagesPerPoll=10&concurrentConsumers=5
> >
> > So in this example, each consumer will pull no more than 10 messages per
> > poll to the SQS Queue.  So if there are less than 10 messages on your
> > queue, the other consumers aren't going to really get an messages.
> >
> > On Mon, Jan 31, 2022 at 7:15 AM Arnaud Level <arn...@toro-intl.com>
> wrote:
> >
> > > Hi Narsi,
> > > It's a fifo but I actually tried the simple code I've posted on a
> > standard
> > > one too and observed the same behavior.
> > >
> > > On Mon, Jan 31, 2022 at 1:11 PM Narsi Reddy Nallamilli <
> > > narsi.nallami...@gmail.com> wrote:
> > >
> > > > Hi Arnaud,
> > > >
> > > > Is your AWS queue type fifo or standard?
> > > >
> > > > On Mon, 31 Jan, 2022, 17:31 Arnaud Level, <arn...@toro-intl.com>
> > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > (Camel version: camel-aws2-sqs-starter: 3.12.0)
> > > > >
> > > > > I am trying to use and understand concurentConsumers with a SQS
> > queue:
> > > > >
> > > > >
> > > >
> > >
> >
> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
> > > > > waitTimeSeconds=20")
> > > > >                 .process(exchange -> {
> > > > >                     System.out.println("Message received...");
> > > > >                     })
> > > > >                 .process(exchange -> {
> > > > >                     try {
> > > > >                         Thread.sleep(5000);
> > > > >                     } catch (InterruptedException e) {
> > > > >                         e.printStackTrace();
> > > > >                     }});
> > > > >
> > > > > With the above queue, if I send 3 messages at the same time, I have
> > to
> > > > wait
> > > > > 5 seconds to see the second message ("Message received...") and 5
> > more
> > > > > seconds to see the third one. My understanding of
> concurentConsumers
> > is
> > > > > that with a value of 5 I would see the 3 messages at the same time
> > > since
> > > > 3
> > > > > threads will consume them in parallel. If I add the Thread.sleep
> in a
> > > > seda
> > > > > route, I'm having this behavior (= The 3 messages are read at the
> > same
> > > > > time).
> > > > >
> > > > > Turning on the Camel logs it seems that the next polling is done
> only
> > > > after
> > > > > the Delete for the previous message is sent (which is with a delay
> of
> > > > 5s).
> > > > >
> > > > > I would understand the above behavior with concurentConsumers=1
> but I
> > > > don't
> > > > > with concurentConsumers=5. Could someone tell me what I've
> > > misunderstood
> > > > ?
> > > > >
> > > > > Thank you in advance!
> > > > > Arnaud
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Arnaud Level
>
> Head of Project Delivery
>
> TORO DEVELOPMENT LTD.
>
> Skype: *toro_arnaud*
>
> T: *+34 636 927 162*
>
> M: *+886 920 565 531*
>
> Còrsega 371, 3a planta 08037 Barcelona, Spain
> www.toro-intl.com
>
>
>
> ------------------------------
> Legal warning:
> This message and its attachments are intended exclusively for its addressee
> and may contain confidential information subject to professional secrecy.
> Reproduction or distribution without the express permission of TORO
> DEVELOPMENT SL is not allowed. If you believe that it has been sent to you
> in error, please reply to the sender that you received the message in
> error.
> According to the Data Protection Act, we inform you that your personal
> information and email are part of a file owned by TORO DEVELOPMENT SL, the
> purpose of the file being, commercial management and sending commercial
> communications about our products and/or services. If you wish, you may
> exercise rights of access, rectification, cancellation and opposition of
> your data by sending a message to the following email address
> *cont...@toro-eu.com
> <cont...@toro-eu.com>*, indicating the "Subject" as the right you want to
> exercise.
> Aviso legal:
> Este mensaje y sus archivos adjuntos van dirigidos exclusivamente a su
> destinatario, pudiendo contener información confidencial sometida a secreto
> profesional. No está permitida su reproducción o distribución sin la
> autorización expresa de TORO DEVELOPMENT S.L.. Si usted no es el
> destinatario final por favor elimínelo e infórmenos por esta vía. De
> acuerdo con la LOPD, le informamos que sus datos personales y dirección de
> correo electrónico forman parte de un fichero, cuyo responsable es TORO
> DEVELOPMENT S.L., siendo la finalidad del fichero, la gestión de carácter
> comercial y el envío de comunicaciones comerciales sobre nuestros productos
> y/o servicios. Si lo desea, podrá usted ejercitar los derechos de acceso,
> rectificación, cancelación y oposición de sus datos enviando un mensaje a
> la siguiente dirección de correo electrónico a *cont...@toro-eu.com
> <cont...@toro-eu.com>*, indicando en la línea de “Asunto” el derecho que
> desea ejercitar.
>

Reply via email to