Re: AWS Kinesis Consumer

2023-12-16 Thread Andrea Cosentino
Hello,

The PR looks good to me, I just would like to have more feedback from
others (in particular, Otavio, the one writing the Resume API and the
Kinesis adapter.

Il giorno sab 16 dic 2023 alle ore 13:26 Alphonse Bendt <
alphonse.be...@gmail.com> ha scritto:

> Hi folks,
>
> A couple of days ago, I asked about resuming an AWS kinesis stream
> consumer:
>
> https://lists.apache.org/thread/sc6oks5jckcbj19t06chmfpm6dpwt60t
>
> I have experimented a bit more with the camel codebase and think now that
> there is functionality missing to allow implementing a ResumeStrategy for a
> Kinesis consumer:
>
> I think the consumer should make the shardId available, plus I think the
> API of the KinesisResumeAdapter needs to be modified.
> I have created a Branch with both proposed changes (see
> https://github.com/apache/camel/pull/12462/files).
> With these changes, I could sketch a custom ResumeStrategy (see branch).
>
> Is this something you would accept as a PR?
>
> Regards,
> Alphonse
>
>
>
>


AWS Kinesis Consumer

2023-12-16 Thread Alphonse Bendt
Hi folks,

A couple of days ago, I asked about resuming an AWS kinesis stream consumer:

https://lists.apache.org/thread/sc6oks5jckcbj19t06chmfpm6dpwt60t

I have experimented a bit more with the camel codebase and think now that there 
is functionality missing to allow implementing a ResumeStrategy for a Kinesis 
consumer:

I think the consumer should make the shardId available, plus I think the API of 
the KinesisResumeAdapter needs to be modified.
I have created a Branch with both proposed changes (see 
https://github.com/apache/camel/pull/12462/files). 
With these changes, I could sketch a custom ResumeStrategy (see branch).

Is this something you would accept as a PR?

Regards,
Alphonse





AWS Kinesis Consumer

2023-12-11 Thread Alphonse Bendt
Hi folks,

For a project, I was evaluating how to use the Camel AWS Kinesis component to 
consume from a Kinesis stream.

I am currently trying to understand how to properly set upstream offset 
handling so that an application can be restarted and won’t need to re-consume 
the whole stream.

The problem is that with a basic setup 
from(„aws2-kinesis://my-stream“).bean(myConsumer) the consumer will re-consume 
the whole stream on restart.

So, I have looked into implementing a resume strategy
https://camel.apache.org/components/4.0.x/eips/resume-strategies.html

It’s unclear to me how it is supposed to work, and I have some questions:
* Are there implementations of ResumeStrategy that work with Kinesis available, 
or are users supposed to implement their own? I could only find 
TransientResumeStrategy, which is in memory only. 

I tried to implement a custom strategy and ran into some questions:

My understanding is that I need to set the message header CamelOffset to ensure 
my strategy is updated with the offsets of the kinesis events that were 
processed:

from("aws2-kinesis://my-stream")
.resumable(„myResumeStrategy")
.process { ex ->
ex.message.setHeader(
Exchange.OFFSET,
Resumables.of(
ex.message.getHeader("CamelAwsKinesisShardId"),
ex.message.getHeader("CamelAwsKinesisSequenceNumber"),
),
)
}

I understand that the resumeStrategy would need to know which sequence number 
was processed last per Shardid. This mapping is then stored in a persistent 
ResumeCache.

However, this doesn't seem to work as CamelAwsKinesisShardId is not available 
in the consumer 
(https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L293)

Also, how is the interaction between KinesisResumeAdapter and Kinesis2Consumer 
supposed to work? 
(https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L280-L283)

In my understanding, the adapter would require the current shardId as a 
parameter to update the GetShardIteratorRequest with the sequence-number from 
the ResumeCache.
However, the only parameter seems to be the stream name.

Thanks in advance for any help/insights!





AWS Kinesis Consumer performance improvement

2017-03-17 Thread elaufis
I have narrowed down a performance bottleneck in the Processor I implemented
in a SEDA route using the AWS Kinesis component.

I tested against an "empty" Processor (i.e. that doesn't do anything) and
got the following metrics:

*For the "empty" Processor*:11:10| INFO | MarkerIgnoringBase.java 107 |
type=METER, name=kinesis-records-consumed, *count=140,
mean_rate=2.3331402604* 

 
16133, m1=1.6458279659887582, m5=0.43339880218285814,
m15=0.15173246327279963, rate_unit=events/second

*For the "actual" Processor*: 11:15| INFO | MarkerIgnoringBase.java 107 |
type=METER, name=kinesis-records-consumed, *count=20,
mean_rate=0.031357470416*, m1=0.17909216046832732,
m5=0.058841979331650814, m15=0.02131567762219331, rate_unit=events/second

So, the bottleneck definitely seems to be in the "actual" Processor. 

I have increased the number of shards on the streamas well as tweaked a host
of other parameters in order to improve performance with no real effect.

I now want to try using a custom ScheduledExecutorService to use multiple
threads for the consumer. 

Is there an example of how to do this?

Also, concerning the number of records in the metrics above, how does the
Camel component determine how many records to read off the stream on the
first read? I'd expect them to be similar and maybe different on subsequent
reads?






--
View this message in context: 
http://camel.465427.n5.nabble.com/AWS-Kinesis-Consumer-performance-improvement-tp5795628.html
Sent from the Camel - Users mailing list archive at Nabble.com.