Hi again Narsi,

I read more about the *delay* component and you are right! Using it with a
custom (or even the default) thread pool profile we can have the expected
behavior without the thread pool being exhausted as I was thinking. This is
a workaround to the concurrentConsumer bug not being taken into account.
I still think we should open an issue as this behavior is supposed to be
supported by the concurrentConsumer URI parameter of the SQS component.
What do you think?

Thanks for your help again!
Arnaud





On Wed, Feb 2, 2022 at 6:42 PM Arnaud Level <arn...@toro-intl.com> wrote:

> Hi Narsi,
>
> Thank you for the reply! Indeed it works but not exactly as expected. I am
> not sure to understand the behavior because with this delay it is not
> limited to the number of concurrent consumers. If I set
> concurrentConsumers=3 and I send 5 messages (which are read 1 by 1 with 5
> pollings) I will see the 5 messages being consumed immediately even though
> they all are entering in a sleep of 5 secs each.
> The expected behavior would be to see 3 immediately and then the next 2
> when two of the first 3 wakes up (=after the 5 s sleep).
>
> From a behavior perspective, it looks like a new thread is created every
> time a message is available on the queue but without any limit. If this is
> the case we can easily end up with an exhausted thread pool. I had a quick
> look at the delay component and it is by default async which would explain
> the behavior we are seeing. What do you think ?
>
> Regarding the concurrentConsumers parameter, if this is a bug, I believe
> it is a regression because according to this article
> <https://blog.christianposta.com/camel/very-fast-camels-and-cloud-messaging/> 
> it
> has worked in the past. Shouldn't we open an issue ?
>
> Thanks again!
> Arnaud
>
> On Wed, Feb 2, 2022 at 5:52 PM Narsi Reddy Nallamilli <
> narsi.nallami...@gmail.com> wrote:
>
>> Hi Arnaud,
>>
>> Yes, you are correct, 'concurrentConsumers; uri attribute is not working
>> as
>> expected and this is some bug.
>>
>> However I found when you use delay in the route it works as expected.
>>
>>
>> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
>> waitTimeSeconds=20")
>> .delay(1L)
>>                 .process(exchange -> {
>>                     System.out.println("Message received...");
>>                     })
>>                 .process(exchange -> {
>>                     try {
>>                         Thread.sleep(5000);
>>                     } catch (InterruptedException e) {
>>                         e.printStackTrace();
>>                     }});
>>
>> try it and let me know.
>>
>> On Mon, Jan 31, 2022 at 11:48 PM Arnaud Level <arn...@toro-intl.com>
>> wrote:
>>
>> > Hi Larry,
>> >
>> > Thank you for your message! Your reply makes sense to me and I've tried
>> > what you suggested to test it with a queue like:
>> >
>> >
>> from("aws2-sqs://aramark-notifications?maxMessagesPerPoll=1&concurrentConsumers=5&amazonSQSClient=#sqsClient")
>> > but it does not change, I still see the messages with a delay of 5
>> seconds
>> > between each of them.
>> >
>> > If I turn on the trace with
>> > (logging.level.org.apache.camel.component.aws2.sqs=TRACE). I see that
>> the
>> > next polling is only triggered after
>> > the Delete message of a consumed message is sent. And in the logs I see
>> "1
>> > message received" but no polling will occur until the Delete message is
>> > sent which occurs only after the sleep of 5 secs. Only 1 message is read
>> > per polling and I should see at least 5 of them since
>> > concurrentConsumers=5.
>> > It looks to me there is still only 1 concurrent consumer and I don't
>> > understand why.
>> >
>> > Arnaud
>> >
>> > On Mon, Jan 31, 2022 at 3:27 PM Larry Shields <larry.shie...@gmail.com>
>> > wrote:
>> >
>> > > Hi Arnaud,
>> > >
>> > > I think what may be happening is that you first consumer is grabbing
>> all
>> > of
>> > > the messages from the queue that are available.  The default message
>> poll
>> > > size is unlimited so your other 4 consumers are polling for nothing on
>> > the
>> > > queue.  What you might want to try is setting the maxMessagesPerPoll
>> to a
>> > > value.  This will reduce the number of messages a consumer retrieves
>> for
>> > > each poll.
>> > >
>> > > The other thing I have done is set Greedy=true when using concurrent
>> > > consumers if you're looking to increase throughput.  This will cause
>> the
>> > > consumer to make another poll immediately without waiting for the next
>> > > delay if the previous poll returned at least 1 message.
>> > >
>> > > Example:
>> > >
>> > >
>> >
>> aws-sqs://my-queue?greedy=true&maxMessagesPerPoll=10&concurrentConsumers=5
>> > >
>> > > So in this example, each consumer will pull no more than 10 messages
>> per
>> > > poll to the SQS Queue.  So if there are less than 10 messages on your
>> > > queue, the other consumers aren't going to really get an messages.
>> > >
>> > > On Mon, Jan 31, 2022 at 7:15 AM Arnaud Level <arn...@toro-intl.com>
>> > wrote:
>> > >
>> > > > Hi Narsi,
>> > > > It's a fifo but I actually tried the simple code I've posted on a
>> > > standard
>> > > > one too and observed the same behavior.
>> > > >
>> > > > On Mon, Jan 31, 2022 at 1:11 PM Narsi Reddy Nallamilli <
>> > > > narsi.nallami...@gmail.com> wrote:
>> > > >
>> > > > > Hi Arnaud,
>> > > > >
>> > > > > Is your AWS queue type fifo or standard?
>> > > > >
>> > > > > On Mon, 31 Jan, 2022, 17:31 Arnaud Level, <arn...@toro-intl.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > (Camel version: camel-aws2-sqs-starter: 3.12.0)
>> > > > > >
>> > > > > > I am trying to use and understand concurentConsumers with a SQS
>> > > queue:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
>> > > > > > waitTimeSeconds=20")
>> > > > > >                 .process(exchange -> {
>> > > > > >                     System.out.println("Message received...");
>> > > > > >                     })
>> > > > > >                 .process(exchange -> {
>> > > > > >                     try {
>> > > > > >                         Thread.sleep(5000);
>> > > > > >                     } catch (InterruptedException e) {
>> > > > > >                         e.printStackTrace();
>> > > > > >                     }});
>> > > > > >
>> > > > > > With the above queue, if I send 3 messages at the same time, I
>> have
>> > > to
>> > > > > wait
>> > > > > > 5 seconds to see the second message ("Message received...") and
>> 5
>> > > more
>> > > > > > seconds to see the third one. My understanding of
>> > concurentConsumers
>> > > is
>> > > > > > that with a value of 5 I would see the 3 messages at the same
>> time
>> > > > since
>> > > > > 3
>> > > > > > threads will consume them in parallel. If I add the Thread.sleep
>> > in a
>> > > > > seda
>> > > > > > route, I'm having this behavior (= The 3 messages are read at
>> the
>> > > same
>> > > > > > time).
>> > > > > >
>> > > > > > Turning on the Camel logs it seems that the next polling is done
>> > only
>> > > > > after
>> > > > > > the Delete for the previous message is sent (which is with a
>> delay
>> > of
>> > > > > 5s).
>> > > > > >
>> > > > > > I would understand the above behavior with concurentConsumers=1
>> > but I
>> > > > > don't
>> > > > > > with concurentConsumers=5. Could someone tell me what I've
>> > > > misunderstood
>> > > > > ?
>> > > > > >
>> > > > > > Thank you in advance!
>> > > > > > Arnaud
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>>
>>

Reply via email to