Hi folks,

For a project, I was evaluating how to use the Camel AWS Kinesis component to 
consume from a Kinesis stream.

I am currently trying to understand how to properly set upstream offset 
handling so that an application can be restarted and won’t need to re-consume 
the whole stream.

The problem is that with a basic setup 
from(„aws2-kinesis://my-stream“).bean(myConsumer) the consumer will re-consume 
the whole stream on restart.

So, I have looked into implementing a resume strategy
https://camel.apache.org/components/4.0.x/eips/resume-strategies.html

It’s unclear to me how it is supposed to work, and I have some questions:
* Are there implementations of ResumeStrategy that work with Kinesis available, 
or are users supposed to implement their own? I could only find 
TransientResumeStrategy, which is in memory only. 

I tried to implement a custom strategy and ran into some questions:

My understanding is that I need to set the message header CamelOffset to ensure 
my strategy is updated with the offsets of the kinesis events that were 
processed:

from("aws2-kinesis://my-stream")
    .resumable(„myResumeStrategy")
    .process { ex ->
        ex.message.setHeader(
            Exchange.OFFSET,
            Resumables.of(
                ex.message.getHeader("CamelAwsKinesisShardId"),
                ex.message.getHeader("CamelAwsKinesisSequenceNumber"),
            ),
        )
    }

I understand that the resumeStrategy would need to know which sequence number 
was processed last per Shardid. This mapping is then stored in a persistent 
ResumeCache.

However, this doesn't seem to work as CamelAwsKinesisShardId is not available 
in the consumer 
(https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L293)

Also, how is the interaction between KinesisResumeAdapter and Kinesis2Consumer 
supposed to work? 
(https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L280-L283)

In my understanding, the adapter would require the current shardId as a 
parameter to update the GetShardIteratorRequest with the sequence-number from 
the ResumeCache.
However, the only parameter seems to be the stream name.

Thanks in advance for any help/insights!



Reply via email to