Hi folks, For a project, I was evaluating how to use the Camel AWS Kinesis component to consume from a Kinesis stream.
I am currently trying to understand how to properly set upstream offset handling so that an application can be restarted and won’t need to re-consume the whole stream. The problem is that with a basic setup from(„aws2-kinesis://my-stream“).bean(myConsumer) the consumer will re-consume the whole stream on restart. So, I have looked into implementing a resume strategy https://camel.apache.org/components/4.0.x/eips/resume-strategies.html It’s unclear to me how it is supposed to work, and I have some questions: * Are there implementations of ResumeStrategy that work with Kinesis available, or are users supposed to implement their own? I could only find TransientResumeStrategy, which is in memory only. I tried to implement a custom strategy and ran into some questions: My understanding is that I need to set the message header CamelOffset to ensure my strategy is updated with the offsets of the kinesis events that were processed: from("aws2-kinesis://my-stream") .resumable(„myResumeStrategy") .process { ex -> ex.message.setHeader( Exchange.OFFSET, Resumables.of( ex.message.getHeader("CamelAwsKinesisShardId"), ex.message.getHeader("CamelAwsKinesisSequenceNumber"), ), ) } I understand that the resumeStrategy would need to know which sequence number was processed last per Shardid. This mapping is then stored in a persistent ResumeCache. However, this doesn't seem to work as CamelAwsKinesisShardId is not available in the consumer (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L293) Also, how is the interaction between KinesisResumeAdapter and Kinesis2Consumer supposed to work? (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L280-L283) In my understanding, the adapter would require the current shardId as a parameter to update the GetShardIteratorRequest with the sequence-number from the ResumeCache. However, the only parameter seems to be the stream name. Thanks in advance for any help/insights!