Re: AWS Kinesis Consumer
Hello, The PR looks good to me, I just would like to have more feedback from others (in particular, Otavio, the one writing the Resume API and the Kinesis adapter. Il giorno sab 16 dic 2023 alle ore 13:26 Alphonse Bendt < alphonse.be...@gmail.com> ha scritto: > Hi folks, > > A couple of days ago, I asked about resuming an AWS kinesis stream > consumer: > > https://lists.apache.org/thread/sc6oks5jckcbj19t06chmfpm6dpwt60t > > I have experimented a bit more with the camel codebase and think now that > there is functionality missing to allow implementing a ResumeStrategy for a > Kinesis consumer: > > I think the consumer should make the shardId available, plus I think the > API of the KinesisResumeAdapter needs to be modified. > I have created a Branch with both proposed changes (see > https://github.com/apache/camel/pull/12462/files). > With these changes, I could sketch a custom ResumeStrategy (see branch). > > Is this something you would accept as a PR? > > Regards, > Alphonse > > > >
AWS Kinesis Consumer
Hi folks, A couple of days ago, I asked about resuming an AWS kinesis stream consumer: https://lists.apache.org/thread/sc6oks5jckcbj19t06chmfpm6dpwt60t I have experimented a bit more with the camel codebase and think now that there is functionality missing to allow implementing a ResumeStrategy for a Kinesis consumer: I think the consumer should make the shardId available, plus I think the API of the KinesisResumeAdapter needs to be modified. I have created a Branch with both proposed changes (see https://github.com/apache/camel/pull/12462/files). With these changes, I could sketch a custom ResumeStrategy (see branch). Is this something you would accept as a PR? Regards, Alphonse
AWS Kinesis Consumer
Hi folks, For a project, I was evaluating how to use the Camel AWS Kinesis component to consume from a Kinesis stream. I am currently trying to understand how to properly set upstream offset handling so that an application can be restarted and won’t need to re-consume the whole stream. The problem is that with a basic setup from(„aws2-kinesis://my-stream“).bean(myConsumer) the consumer will re-consume the whole stream on restart. So, I have looked into implementing a resume strategy https://camel.apache.org/components/4.0.x/eips/resume-strategies.html It’s unclear to me how it is supposed to work, and I have some questions: * Are there implementations of ResumeStrategy that work with Kinesis available, or are users supposed to implement their own? I could only find TransientResumeStrategy, which is in memory only. I tried to implement a custom strategy and ran into some questions: My understanding is that I need to set the message header CamelOffset to ensure my strategy is updated with the offsets of the kinesis events that were processed: from("aws2-kinesis://my-stream") .resumable(„myResumeStrategy") .process { ex -> ex.message.setHeader( Exchange.OFFSET, Resumables.of( ex.message.getHeader("CamelAwsKinesisShardId"), ex.message.getHeader("CamelAwsKinesisSequenceNumber"), ), ) } I understand that the resumeStrategy would need to know which sequence number was processed last per Shardid. This mapping is then stored in a persistent ResumeCache. However, this doesn't seem to work as CamelAwsKinesisShardId is not available in the consumer (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L293) Also, how is the interaction between KinesisResumeAdapter and Kinesis2Consumer supposed to work? (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L280-L283) In my understanding, the adapter would require the current shardId as a parameter to update the GetShardIteratorRequest with the sequence-number from the ResumeCache. However, the only parameter seems to be the stream name. Thanks in advance for any help/insights!
AWS Kinesis Consumer performance improvement
I have narrowed down a performance bottleneck in the Processor I implemented in a SEDA route using the AWS Kinesis component. I tested against an "empty" Processor (i.e. that doesn't do anything) and got the following metrics: *For the "empty" Processor*:11:10| INFO | MarkerIgnoringBase.java 107 | type=METER, name=kinesis-records-consumed, *count=140, mean_rate=2.3331402604* 16133, m1=1.6458279659887582, m5=0.43339880218285814, m15=0.15173246327279963, rate_unit=events/second *For the "actual" Processor*: 11:15| INFO | MarkerIgnoringBase.java 107 | type=METER, name=kinesis-records-consumed, *count=20, mean_rate=0.031357470416*, m1=0.17909216046832732, m5=0.058841979331650814, m15=0.02131567762219331, rate_unit=events/second So, the bottleneck definitely seems to be in the "actual" Processor. I have increased the number of shards on the streamas well as tweaked a host of other parameters in order to improve performance with no real effect. I now want to try using a custom ScheduledExecutorService to use multiple threads for the consumer. Is there an example of how to do this? Also, concerning the number of records in the metrics above, how does the Camel component determine how many records to read off the stream on the first read? I'd expect them to be similar and maybe different on subsequent reads? -- View this message in context: http://camel.465427.n5.nabble.com/AWS-Kinesis-Consumer-performance-improvement-tp5795628.html Sent from the Camel - Users mailing list archive at Nabble.com.