Have you tried MirrorMaker 2's consumer offset translation feature?  I have
not used this myself, but it sounds like what you are looking for!

I tried to find some better docs to link for you, but that's the best I got
:)  It looks like there is just the Java API.

On Wed, May 4, 2022 at 3:29 PM Hemanga Borah <borah.hema...@gmail.com>

> Thank you for the suggestions, guys!
> @Austin Cawley-Edwards
> Your idea is spot on! This approach would surely work. We could take a
> savepoint of each of our apps, load it using state processor apis and
> create another savepoint accounting for the delta on the offsets, and start
> the app on the new cloud using this modified savepoint.
> However, the solution will not be generic, and we have to do this for each
> of our applications. This can be quite cumbersome as we have several
> applications (around 25).
> We are thinking of overriding the FlinkKafkaConsumerBase to account for
> the offset deltas during the start-up of any app. Do you think it is safe
> to do that? Is there a better way of doing this?
> @Schwalbe Matthias
> Thank you for your suggestion. We do use exactly-once semantics, but, our
> apps can tolerate a few duplicates in rare cases like this one where we are
> migrating clouds. However, your suggestion is really helpful and we will
> use it in case some of the apps cannot tolerate duplicate data.
> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>> Hello Hemanga,
>> MirrorMaker can cause havoc in many respects, for one, it does not have
>> strict exactly-once.semantics…
>> The way I would tackle this problem (and have done in similar
>> situaltions):
>>    - For the source topics that need to be have exactly-once-semantics
>>    and that are not intrinsically idempotent:
>>    - Add one extra operator after the source that deduplicates events by
>>    unique id for a rolling time range (on the source cloud provider)
>>    - Take a savepoint after the rolling time-range has passed (at least
>>    once completely)
>>    - Move your job to the target cloud provider
>>    - Reconfigure the resp. source with a new kafka consumer group.id,
>>    - Change the uid() of the resp. kafka source,
>>    - Configure start-by-timestamp for the resp. source with a timestamp
>>    that lies within the rolling time range (of above)
>>    - Configure the job to ignore  recovery for state that does not have
>>    a corresponding operator in the job (the previous kafka source uid()s)
>>    - Start the job on new cloud provider, wait for it to pick
>>    up/back-fill
>>    - Take a savepoint
>>    - Remove deduplication operator if that causes too much
>>    load/latency/whatever
>> This scheme sounds more complicated than it really is … and has saved my
>> sanity quite a number of times 😊
>> Good luck and ready to answer more details
>> Thias
>> *From:* Hemanga Borah <borah.hema...@gmail.com>
>> *Sent:* Tuesday, May 3, 2022 3:12 AM
>> *To:* user@flink.apache.org
>> *Subject:* Migrating Flink apps across cloud with state
>> Hello,
>>  We are attempting to port our Flink applications from one cloud provider
>> to another.
>>  These Flink applications consume data from Kafka topics and output to
>> various destinations (Kafka or databases). The applications have states
>> stored in them. Some of these stored states are aggregations, for example,
>> at times we store hours (or days) worth of data to aggregate over time.
>> Some other applications have cached information for data enrichment, for
>> example, we store data in Flink state for days, so that we can join them
>> with newly arrived data. The amount of data on the input topics is a lot,
>> and it will be expensive to reprocess the data from the beginning of the
>> topic.
>>  As such, we want to retain the state of the application when we move to
>> a different cloud provider so that we can retain the aggregations and
>> cache, and do not have to start from the beginning of the input topics.
>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>> procedure:
>>    - Replicate the input topics of each Flink application from source
>>    cloud to destination cloud.
>>    - Take a savepoint of the Flink application on the source cloud
>>    provider.
>>    - Start the Flink application on the destination cloud provider using
>>    the savepoint from the source cloud provider.
>> However, this does not work as we want because there is a difference in
>> offset in the new topics in the new cloud provider (because of MirrorMaker
>> implementation). The offsets of the new topic do not match the ones stored
>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>> topic during startup.
>> Has anyone tried to move clouds while retaining the Flink state?
>> Thanks,
>> Hemanga
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.

Reply via email to