Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-04-05 Thread mejri houssem
Hello,

In this paraphrase of the documentation  [1], it is mentioned  that <<*Kafka
source does not **rely on committed offsets for fault tolerance. Committing
offset is only for exposing the progress of consumer and consuming group
for monitoring*>>.

Can someone explain please why the kafka source does not rely on the
committed offset for recovery, even though the offset stored in
checkpoint/savepoint is the same as the one committed to kafka ?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing

Best Regards,

Le mer. 26 mars 2025 à 14:27, Gabor Somogyi  a
écrit :

> > Are they two different things?
> There are no consumer and broker offsets, there are offsets which belong
> to a topic + partition pair.
>
> > And which offset is saved in the checkpoint/savepoint?
> Which Flink thinks is processed already.
>
> Regarding the PROD deploy now you know the risks so feel free to pick one.
>
> BR,
> G
>
>
> On Wed, Mar 26, 2025 at 2:11 PM mejri houssem 
> wrote:
>
>> Hello Gabor,
>>
>> Thanks for your response.
>>
>> I just want to clarify one thing: is there any difference between the
>> Kafka source offset and the Kafka broker offset? Are they two different
>> things? And which offset is saved in the checkpoint/savepoint?
>>
>> For our use case, we intend to take a savepoint only once before updating
>> the job in production. This means we stop the job with a savepoint, make
>> some updates, and then restart from the savepoint we have taken.
>>
>> Best Regards,
>>
>> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi 
>> a écrit :
>>
>>> In short it's encouraged to use savepoint because of the following
>>> situation:
>>> * You start processing data from offset 0
>>> * 2 savepoints created, one with offset 10, another with 20
>>> * This timeframe Kafka has offset 20 since that's the last processed
>>> * At offset 30 you realize that data processed between 10 and 30 are
>>> just faulty because of broken job logic
>>>
>>> Reading offsets from savepoint is relatively easy, just restart the job
>>> from offset 10 savepoint.
>>> When Kafka is the source of truth then you need to do some mumbo-jumbo
>>> to cut back the Kafka offsets + you've
>>> most probably no idea where to cut back.
>>>
>>> RabbitMQ source (consumer) reads from a queue and acknowledges messages
>>> on checkpoints.
>>> When checkpointing is enabled, it guarantees exactly-once processing
>>> semantics. Please see [1] for further details.
>>>
>>> Hope this helps.
>>>
>>> [1]
>>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem 
>>> wrote:
>>>
 Hello,

 Is there any further clarification or explanation regarding the
 subject, please?

 Best regards.

 Le mer. 19 mars 2025 à 15:02, mejri houssem 
 a écrit :

> Hello,
>
> So if I understand you well,  I cannot rely on the kafka broker offset
> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
> that would not be possible.
>
> Best regards
>
> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a
> écrit :
>
>> Hi Mejri,
>> Not exactly, you can still rely on savepoint to restart/redeploy the
>> job from the latest offset recorded in Flink, my reply was regarding your
>> question if you can replace that and just depend on the committed offsets
>> in the kafka broker. For at-least-once semantic savepoints and 
>> checkpoints
>> book-keep the offset for the Flink job after the initialization, the 
>> config
>> I mentioned only configures the initialization of the consumers. If you
>> start the job without savepoint and it falls back to the config (which 
>> may
>> be using the broker committed offset) that might achieve the semantic but
>> it doesn't guarantee that.
>> For example assume you restore from save point, job completes a
>> couple of checkpoints hence the offset committed is updated in kafka then
>> for some reason you figure out a bug, if you only depend on Kafka broker
>> committed offset you would probably break the semantic while if you use
>> savepoints you can redeploy from the last correct version savepoint and
>> reprocess the data that was processed by the buggy job.
>>
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Wed, 19 Mar 2025 at 00:54, mejri houssem 
>> wrote:
>>
>>> Hello Ahmed,
>>>
>>> Thanks for the response.
>>>
>>> Does that mean checkpoints and savepoints have nothing to do with
>>> the at-least-once guarantee, since it depends solely on the starting 
>>> offset
>>> configuration?
>>>
>>> Best Regards
>>>
>>> Le mar. 18 mars 2025 à

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-27 Thread Andrew Otto
I learned about this a couple of years ago when I was investigating the
feasibility of a "Kafka stretch" cluster.  From that email thread
:

> I got excited about this possibility, only to learn that Flink's
KafkaSource does not use Kafka for consumer assignment. I think I
understand why it does this: the Source API can do a lot more than Kafka,
so having some kind of state management (offsets) and task assignment
(Kafka consumer balance protocol) outside of the usual Flink Source would
be pretty weird. Implementing offset and task assignment inside of the
KafkaSource allows it to work like any other Source implementation.

Basically, other Sources don't have watermark state management like Kafka
does, so keeping watermark management inside the Flink framework makes it
easier to swap Sources around, enabling things like Hybrid Source

.



On Thu, Mar 27, 2025 at 5:07 AM mejri houssem 
wrote:

> Hello,
>
> In this paraphrase of the documentation  [1], it is mentioned  that <<*Kafka
> source does not **rely on committed offsets for fault tolerance.
> Committing offset is only for exposing the progress of consumer and
> consuming group for monitoring*>>.
>
> Can someone explain please why the kafka source does not rely on the
> committed offset for recovery, even though the offset stored in
> checkpoint/savepoint is the same as the one committed to kafka ?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing
>
> Best Regards,
>
> Le mer. 26 mars 2025 à 14:27, Gabor Somogyi  a
> écrit :
>
>> > Are they two different things?
>> There are no consumer and broker offsets, there are offsets which belong
>> to a topic + partition pair.
>>
>> > And which offset is saved in the checkpoint/savepoint?
>> Which Flink thinks is processed already.
>>
>> Regarding the PROD deploy now you know the risks so feel free to pick one.
>>
>> BR,
>> G
>>
>>
>> On Wed, Mar 26, 2025 at 2:11 PM mejri houssem 
>> wrote:
>>
>>> Hello Gabor,
>>>
>>> Thanks for your response.
>>>
>>> I just want to clarify one thing: is there any difference between the
>>> Kafka source offset and the Kafka broker offset? Are they two different
>>> things? And which offset is saved in the checkpoint/savepoint?
>>>
>>> For our use case, we intend to take a savepoint only once before
>>> updating the job in production. This means we stop the job with a
>>> savepoint, make some updates, and then restart from the savepoint we have
>>> taken.
>>>
>>> Best Regards,
>>>
>>> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi 
>>> a écrit :
>>>
 In short it's encouraged to use savepoint because of the following
 situation:
 * You start processing data from offset 0
 * 2 savepoints created, one with offset 10, another with 20
 * This timeframe Kafka has offset 20 since that's the last processed
 * At offset 30 you realize that data processed between 10 and 30 are
 just faulty because of broken job logic

 Reading offsets from savepoint is relatively easy, just restart the job
 from offset 10 savepoint.
 When Kafka is the source of truth then you need to do some mumbo-jumbo
 to cut back the Kafka offsets + you've
 most probably no idea where to cut back.

 RabbitMQ source (consumer) reads from a queue and acknowledges messages
 on checkpoints.
 When checkpointing is enabled, it guarantees exactly-once processing
 semantics. Please see [1] for further details.

 Hope this helps.

 [1]
 https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76

 BR,
 G


 On Wed, Mar 26, 2025 at 1:05 AM mejri houssem 
 wrote:

> Hello,
>
> Is there any further clarification or explanation regarding the
> subject, please?
>
> Best regards.
>
> Le mer. 19 mars 2025 à 15:02, mejri houssem 
> a écrit :
>
>> Hello,
>>
>> So if I understand you well,  I cannot rely on the kafka broker
>> offset to achieve at-least-once guarantee. Without checkpoint/savepoint
>> enabled,  that would not be possible.
>>
>> Best regards
>>
>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a
>> écrit :
>>
>>> Hi Mejri,
>>> Not exactly, you can still rely on savepoint to restart/redeploy the
>>> job from the latest offset recorded in Flink, my reply was regarding 
>>> your
>>> question if you can replace that and just depend on the committed 
>>> offsets
>>> in the kafka broker. For at-least-once semantic savepoints and 
>>> checkpoints
>>> book-keep the offset for the Flink job after the initialization, t

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-26 Thread Gabor Somogyi
> Are they two different things?
There are no consumer and broker offsets, there are offsets which belong to
a topic + partition pair.

> And which offset is saved in the checkpoint/savepoint?
Which Flink thinks is processed already.

Regarding the PROD deploy now you know the risks so feel free to pick one.

BR,
G


On Wed, Mar 26, 2025 at 2:11 PM mejri houssem 
wrote:

> Hello Gabor,
>
> Thanks for your response.
>
> I just want to clarify one thing: is there any difference between the
> Kafka source offset and the Kafka broker offset? Are they two different
> things? And which offset is saved in the checkpoint/savepoint?
>
> For our use case, we intend to take a savepoint only once before updating
> the job in production. This means we stop the job with a savepoint, make
> some updates, and then restart from the savepoint we have taken.
>
> Best Regards,
>
> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi  a
> écrit :
>
>> In short it's encouraged to use savepoint because of the following
>> situation:
>> * You start processing data from offset 0
>> * 2 savepoints created, one with offset 10, another with 20
>> * This timeframe Kafka has offset 20 since that's the last processed
>> * At offset 30 you realize that data processed between 10 and 30 are just
>> faulty because of broken job logic
>>
>> Reading offsets from savepoint is relatively easy, just restart the job
>> from offset 10 savepoint.
>> When Kafka is the source of truth then you need to do some mumbo-jumbo to
>> cut back the Kafka offsets + you've
>> most probably no idea where to cut back.
>>
>> RabbitMQ source (consumer) reads from a queue and acknowledges messages
>> on checkpoints.
>> When checkpointing is enabled, it guarantees exactly-once processing
>> semantics. Please see [1] for further details.
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>>
>> BR,
>> G
>>
>>
>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem 
>> wrote:
>>
>>> Hello,
>>>
>>> Is there any further clarification or explanation regarding the subject,
>>> please?
>>>
>>> Best regards.
>>>
>>> Le mer. 19 mars 2025 à 15:02, mejri houssem 
>>> a écrit :
>>>
 Hello,

 So if I understand you well,  I cannot rely on the kafka broker offset
 to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
 that would not be possible.

 Best regards

 Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a
 écrit :

> Hi Mejri,
> Not exactly, you can still rely on savepoint to restart/redeploy the
> job from the latest offset recorded in Flink, my reply was regarding your
> question if you can replace that and just depend on the committed offsets
> in the kafka broker. For at-least-once semantic savepoints and checkpoints
> book-keep the offset for the Flink job after the initialization, the 
> config
> I mentioned only configures the initialization of the consumers. If you
> start the job without savepoint and it falls back to the config (which may
> be using the broker committed offset) that might achieve the semantic but
> it doesn't guarantee that.
> For example assume you restore from save point, job completes a couple
> of checkpoints hence the offset committed is updated in kafka then for 
> some
> reason you figure out a bug, if you only depend on Kafka broker committed
> offset you would probably break the semantic while if you use savepoints
> you can redeploy from the last correct version savepoint and reprocess the
> data that was processed by the buggy job.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Wed, 19 Mar 2025 at 00:54, mejri houssem 
> wrote:
>
>> Hello Ahmed,
>>
>> Thanks for the response.
>>
>> Does that mean checkpoints and savepoints have nothing to do with the
>> at-least-once guarantee, since it depends solely on the starting offset
>> configuration?
>>
>> Best Regards
>>
>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a
>> écrit :
>>
>>> Hi Mejri
>>>
>>> > I’m wondering if this is strictly necessary, since the Kafka
>>> broker itself keeps track of offsets (i am not mistaken). In other 
>>> words,
>>> if we redeploy the job, will it automatically resume from the last Kafka
>>> offset, or should we still rely on Flink’s checkpoint/savepoint 
>>> mechanism
>>> to ensure correct offset recovery?
>>>
>>> This depends on the starting offset you set in the source config[1].
>>> you can configure it to start from earliest or last committed or latest 
>>> or
>>> at specific offset.
>>>
>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
>>> read messages unlike Kafka.
>>>
>>>
>>> 1-

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-26 Thread mejri houssem
Hello Gabor,

Thanks for your response.

I just want to clarify one thing: is there any difference between the Kafka
source offset and the Kafka broker offset? Are they two different things?
And which offset is saved in the checkpoint/savepoint?

For our use case, we intend to take a savepoint only once before updating
the job in production. This means we stop the job with a savepoint, make
some updates, and then restart from the savepoint we have taken.

Best Regards,

Le mer. 26 mars 2025 à 07:18, Gabor Somogyi  a
écrit :

> In short it's encouraged to use savepoint because of the following
> situation:
> * You start processing data from offset 0
> * 2 savepoints created, one with offset 10, another with 20
> * This timeframe Kafka has offset 20 since that's the last processed
> * At offset 30 you realize that data processed between 10 and 30 are just
> faulty because of broken job logic
>
> Reading offsets from savepoint is relatively easy, just restart the job
> from offset 10 savepoint.
> When Kafka is the source of truth then you need to do some mumbo-jumbo to
> cut back the Kafka offsets + you've
> most probably no idea where to cut back.
>
> RabbitMQ source (consumer) reads from a queue and acknowledges messages on
> checkpoints.
> When checkpointing is enabled, it guarantees exactly-once processing
> semantics. Please see [1] for further details.
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>
> BR,
> G
>
>
> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem 
> wrote:
>
>> Hello,
>>
>> Is there any further clarification or explanation regarding the subject,
>> please?
>>
>> Best regards.
>>
>> Le mer. 19 mars 2025 à 15:02, mejri houssem  a
>> écrit :
>>
>>> Hello,
>>>
>>> So if I understand you well,  I cannot rely on the kafka broker offset
>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
>>> that would not be possible.
>>>
>>> Best regards
>>>
>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a
>>> écrit :
>>>
 Hi Mejri,
 Not exactly, you can still rely on savepoint to restart/redeploy the
 job from the latest offset recorded in Flink, my reply was regarding your
 question if you can replace that and just depend on the committed offsets
 in the kafka broker. For at-least-once semantic savepoints and checkpoints
 book-keep the offset for the Flink job after the initialization, the config
 I mentioned only configures the initialization of the consumers. If you
 start the job without savepoint and it falls back to the config (which may
 be using the broker committed offset) that might achieve the semantic but
 it doesn't guarantee that.
 For example assume you restore from save point, job completes a couple
 of checkpoints hence the offset committed is updated in kafka then for some
 reason you figure out a bug, if you only depend on Kafka broker committed
 offset you would probably break the semantic while if you use savepoints
 you can redeploy from the last correct version savepoint and reprocess the
 data that was processed by the buggy job.

 Best Regards
 Ahmed Hamdy


 On Wed, 19 Mar 2025 at 00:54, mejri houssem 
 wrote:

> Hello Ahmed,
>
> Thanks for the response.
>
> Does that mean checkpoints and savepoints have nothing to do with the
> at-least-once guarantee, since it depends solely on the starting offset
> configuration?
>
> Best Regards
>
> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a
> écrit :
>
>> Hi Mejri
>>
>> > I’m wondering if this is strictly necessary, since the Kafka broker
>> itself keeps track of offsets (i am not mistaken). In other words, if we
>> redeploy the job, will it automatically resume from the last Kafka 
>> offset,
>> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
>> ensure
>> correct offset recovery?
>>
>> This depends on the starting offset you set in the source config[1].
>> you can configure it to start from earliest or last committed or latest 
>> or
>> at specific offset.
>>
>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
>> read messages unlike Kafka.
>>
>>
>> 1-
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Tue, 18 Mar 2025 at 22:20, mejri houssem 
>> wrote:
>>
>>>
>>> Hello everyone,
>>>
>>> We have a stateless Flink job that uses a Kafka source with
>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the 
>>> event
>>> of a restart, Flink can restore from the last committed offset stored 
>>> in 

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-25 Thread Gabor Somogyi
In short it's encouraged to use savepoint because of the following
situation:
* You start processing data from offset 0
* 2 savepoints created, one with offset 10, another with 20
* This timeframe Kafka has offset 20 since that's the last processed
* At offset 30 you realize that data processed between 10 and 30 are just
faulty because of broken job logic

Reading offsets from savepoint is relatively easy, just restart the job
from offset 10 savepoint.
When Kafka is the source of truth then you need to do some mumbo-jumbo to
cut back the Kafka offsets + you've
most probably no idea where to cut back.

RabbitMQ source (consumer) reads from a queue and acknowledges messages on
checkpoints.
When checkpointing is enabled, it guarantees exactly-once processing
semantics. Please see [1] for further details.

Hope this helps.

[1]
https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76

BR,
G


On Wed, Mar 26, 2025 at 1:05 AM mejri houssem 
wrote:

> Hello,
>
> Is there any further clarification or explanation regarding the subject,
> please?
>
> Best regards.
>
> Le mer. 19 mars 2025 à 15:02, mejri houssem  a
> écrit :
>
>> Hello,
>>
>> So if I understand you well,  I cannot rely on the kafka broker offset to
>> achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
>> that would not be possible.
>>
>> Best regards
>>
>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a
>> écrit :
>>
>>> Hi Mejri,
>>> Not exactly, you can still rely on savepoint to restart/redeploy the job
>>> from the latest offset recorded in Flink, my reply was regarding your
>>> question if you can replace that and just depend on the committed offsets
>>> in the kafka broker. For at-least-once semantic savepoints and checkpoints
>>> book-keep the offset for the Flink job after the initialization, the config
>>> I mentioned only configures the initialization of the consumers. If you
>>> start the job without savepoint and it falls back to the config (which may
>>> be using the broker committed offset) that might achieve the semantic but
>>> it doesn't guarantee that.
>>> For example assume you restore from save point, job completes a couple
>>> of checkpoints hence the offset committed is updated in kafka then for some
>>> reason you figure out a bug, if you only depend on Kafka broker committed
>>> offset you would probably break the semantic while if you use savepoints
>>> you can redeploy from the last correct version savepoint and reprocess the
>>> data that was processed by the buggy job.
>>>
>>> Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem 
>>> wrote:
>>>
 Hello Ahmed,

 Thanks for the response.

 Does that mean checkpoints and savepoints have nothing to do with the
 at-least-once guarantee, since it depends solely on the starting offset
 configuration?

 Best Regards

 Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a
 écrit :

> Hi Mejri
>
> > I’m wondering if this is strictly necessary, since the Kafka broker
> itself keeps track of offsets (i am not mistaken). In other words, if we
> redeploy the job, will it automatically resume from the last Kafka offset,
> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
> ensure
> correct offset recovery?
>
> This depends on the starting offset you set in the source config[1].
> you can configure it to start from earliest or last committed or latest or
> at specific offset.
>
> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
> read messages unlike Kafka.
>
>
> 1-
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 18 Mar 2025 at 22:20, mejri houssem 
> wrote:
>
>>
>> Hello everyone,
>>
>> We have a stateless Flink job that uses a Kafka source with
>> at-least-once guarantees. We’ve enabled checkpoints so that, in the event
>> of a restart, Flink can restore from the last committed offset stored in 
>> a
>> successful checkpoint. Now we’re considering enabling savepoints for our
>> production deployment.
>>
>> I’m wondering if this is strictly necessary, since the Kafka broker
>> itself keeps track of offsets (i am not mistaken). In other words, if we
>> redeploy the job, will it automatically resume from the last Kafka 
>> offset,
>> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
>> ensure
>> correct offset recovery?
>>
>> Additionally, we have another job that uses a RabbitMQ source with
>> checkpoints enabled to manage manual acknowledgments. Does the same logic
>> apply in that case as well?
>>
>> Tha

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-25 Thread mejri houssem
Hello,

Is there any further clarification or explanation regarding the subject,
please?

Best regards.

Le mer. 19 mars 2025 à 15:02, mejri houssem  a
écrit :

> Hello,
>
> So if I understand you well,  I cannot rely on the kafka broker offset to
> achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
> that would not be possible.
>
> Best regards
>
> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a écrit :
>
>> Hi Mejri,
>> Not exactly, you can still rely on savepoint to restart/redeploy the job
>> from the latest offset recorded in Flink, my reply was regarding your
>> question if you can replace that and just depend on the committed offsets
>> in the kafka broker. For at-least-once semantic savepoints and checkpoints
>> book-keep the offset for the Flink job after the initialization, the config
>> I mentioned only configures the initialization of the consumers. If you
>> start the job without savepoint and it falls back to the config (which may
>> be using the broker committed offset) that might achieve the semantic but
>> it doesn't guarantee that.
>> For example assume you restore from save point, job completes a couple of
>> checkpoints hence the offset committed is updated in kafka then for some
>> reason you figure out a bug, if you only depend on Kafka broker committed
>> offset you would probably break the semantic while if you use savepoints
>> you can redeploy from the last correct version savepoint and reprocess the
>> data that was processed by the buggy job.
>>
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Wed, 19 Mar 2025 at 00:54, mejri houssem 
>> wrote:
>>
>>> Hello Ahmed,
>>>
>>> Thanks for the response.
>>>
>>> Does that mean checkpoints and savepoints have nothing to do with the
>>> at-least-once guarantee, since it depends solely on the starting offset
>>> configuration?
>>>
>>> Best Regards
>>>
>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a
>>> écrit :
>>>
 Hi Mejri

 > I’m wondering if this is strictly necessary, since the Kafka broker
 itself keeps track of offsets (i am not mistaken). In other words, if we
 redeploy the job, will it automatically resume from the last Kafka offset,
 or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
 correct offset recovery?

 This depends on the starting offset you set in the source config[1].
 you can configure it to start from earliest or last committed or latest or
 at specific offset.

 I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack read
 messages unlike Kafka.


 1-
 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
 Best Regards
 Ahmed Hamdy


 On Tue, 18 Mar 2025 at 22:20, mejri houssem 
 wrote:

>
> Hello everyone,
>
> We have a stateless Flink job that uses a Kafka source with
> at-least-once guarantees. We’ve enabled checkpoints so that, in the event
> of a restart, Flink can restore from the last committed offset stored in a
> successful checkpoint. Now we’re considering enabling savepoints for our
> production deployment.
>
> I’m wondering if this is strictly necessary, since the Kafka broker
> itself keeps track of offsets (i am not mistaken). In other words, if we
> redeploy the job, will it automatically resume from the last Kafka offset,
> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
> ensure
> correct offset recovery?
>
> Additionally, we have another job that uses a RabbitMQ source with
> checkpoints enabled to manage manual acknowledgments. Does the same logic
> apply in that case as well?
>
> Thanks in advance for any guidance!point enabled in order to activate
> manual ack. Does this apply to this job also?
>
> Thanks in advance.
>
>
> Best Regards.
>



Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-19 Thread mejri houssem
Hello,

So if I understand you well,  I cannot rely on the kafka broker offset to
achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
that would not be possible.

Best regards

Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy  a écrit :

> Hi Mejri,
> Not exactly, you can still rely on savepoint to restart/redeploy the job
> from the latest offset recorded in Flink, my reply was regarding your
> question if you can replace that and just depend on the committed offsets
> in the kafka broker. For at-least-once semantic savepoints and checkpoints
> book-keep the offset for the Flink job after the initialization, the config
> I mentioned only configures the initialization of the consumers. If you
> start the job without savepoint and it falls back to the config (which may
> be using the broker committed offset) that might achieve the semantic but
> it doesn't guarantee that.
> For example assume you restore from save point, job completes a couple of
> checkpoints hence the offset committed is updated in kafka then for some
> reason you figure out a bug, if you only depend on Kafka broker committed
> offset you would probably break the semantic while if you use savepoints
> you can redeploy from the last correct version savepoint and reprocess the
> data that was processed by the buggy job.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Wed, 19 Mar 2025 at 00:54, mejri houssem 
> wrote:
>
>> Hello Ahmed,
>>
>> Thanks for the response.
>>
>> Does that mean checkpoints and savepoints have nothing to do with the
>> at-least-once guarantee, since it depends solely on the starting offset
>> configuration?
>>
>> Best Regards
>>
>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a
>> écrit :
>>
>>> Hi Mejri
>>>
>>> > I’m wondering if this is strictly necessary, since the Kafka broker
>>> itself keeps track of offsets (i am not mistaken). In other words, if we
>>> redeploy the job, will it automatically resume from the last Kafka offset,
>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
>>> correct offset recovery?
>>>
>>> This depends on the starting offset you set in the source config[1]. you
>>> can configure it to start from earliest or last committed or latest or at
>>> specific offset.
>>>
>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack read
>>> messages unlike Kafka.
>>>
>>>
>>> 1-
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>> Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem 
>>> wrote:
>>>

 Hello everyone,

 We have a stateless Flink job that uses a Kafka source with
 at-least-once guarantees. We’ve enabled checkpoints so that, in the event
 of a restart, Flink can restore from the last committed offset stored in a
 successful checkpoint. Now we’re considering enabling savepoints for our
 production deployment.

 I’m wondering if this is strictly necessary, since the Kafka broker
 itself keeps track of offsets (i am not mistaken). In other words, if we
 redeploy the job, will it automatically resume from the last Kafka offset,
 or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
 correct offset recovery?

 Additionally, we have another job that uses a RabbitMQ source with
 checkpoints enabled to manage manual acknowledgments. Does the same logic
 apply in that case as well?

 Thanks in advance for any guidance!point enabled in order to activate
 manual ack. Does this apply to this job also?

 Thanks in advance.


 Best Regards.

>>>


Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-19 Thread Ahmed Hamdy
Hi Mejri,
Not exactly, you can still rely on savepoint to restart/redeploy the job
from the latest offset recorded in Flink, my reply was regarding your
question if you can replace that and just depend on the committed offsets
in the kafka broker. For at-least-once semantic savepoints and checkpoints
book-keep the offset for the Flink job after the initialization, the config
I mentioned only configures the initialization of the consumers. If you
start the job without savepoint and it falls back to the config (which may
be using the broker committed offset) that might achieve the semantic but
it doesn't guarantee that.
For example assume you restore from save point, job completes a couple of
checkpoints hence the offset committed is updated in kafka then for some
reason you figure out a bug, if you only depend on Kafka broker committed
offset you would probably break the semantic while if you use savepoints
you can redeploy from the last correct version savepoint and reprocess the
data that was processed by the buggy job.

Best Regards
Ahmed Hamdy


On Wed, 19 Mar 2025 at 00:54, mejri houssem 
wrote:

> Hello Ahmed,
>
> Thanks for the response.
>
> Does that mean checkpoints and savepoints have nothing to do with the
> at-least-once guarantee, since it depends solely on the starting offset
> configuration?
>
> Best Regards
>
> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a écrit :
>
>> Hi Mejri
>>
>> > I’m wondering if this is strictly necessary, since the Kafka broker
>> itself keeps track of offsets (i am not mistaken). In other words, if we
>> redeploy the job, will it automatically resume from the last Kafka offset,
>> or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
>> correct offset recovery?
>>
>> This depends on the starting offset you set in the source config[1]. you
>> can configure it to start from earliest or last committed or latest or at
>> specific offset.
>>
>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack read
>> messages unlike Kafka.
>>
>>
>> 1-
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Tue, 18 Mar 2025 at 22:20, mejri houssem 
>> wrote:
>>
>>>
>>> Hello everyone,
>>>
>>> We have a stateless Flink job that uses a Kafka source with
>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the event
>>> of a restart, Flink can restore from the last committed offset stored in a
>>> successful checkpoint. Now we’re considering enabling savepoints for our
>>> production deployment.
>>>
>>> I’m wondering if this is strictly necessary, since the Kafka broker
>>> itself keeps track of offsets (i am not mistaken). In other words, if we
>>> redeploy the job, will it automatically resume from the last Kafka offset,
>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
>>> correct offset recovery?
>>>
>>> Additionally, we have another job that uses a RabbitMQ source with
>>> checkpoints enabled to manage manual acknowledgments. Does the same logic
>>> apply in that case as well?
>>>
>>> Thanks in advance for any guidance!point enabled in order to activate
>>> manual ack. Does this apply to this job also?
>>>
>>> Thanks in advance.
>>>
>>>
>>> Best Regards.
>>>
>>


Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-18 Thread mejri houssem
Hello Ahmed,

Thanks for the response.

Does that mean checkpoints and savepoints have nothing to do with the
at-least-once guarantee, since it depends solely on the starting offset
configuration?

Best Regards

Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy  a écrit :

> Hi Mejri
>
> > I’m wondering if this is strictly necessary, since the Kafka broker
> itself keeps track of offsets (i am not mistaken). In other words, if we
> redeploy the job, will it automatically resume from the last Kafka offset,
> or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
> correct offset recovery?
>
> This depends on the starting offset you set in the source config[1]. you
> can configure it to start from earliest or last committed or latest or at
> specific offset.
>
> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack read
> messages unlike Kafka.
>
>
> 1-
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 18 Mar 2025 at 22:20, mejri houssem 
> wrote:
>
>>
>> Hello everyone,
>>
>> We have a stateless Flink job that uses a Kafka source with at-least-once
>> guarantees. We’ve enabled checkpoints so that, in the event of a restart,
>> Flink can restore from the last committed offset stored in a successful
>> checkpoint. Now we’re considering enabling savepoints for our production
>> deployment.
>>
>> I’m wondering if this is strictly necessary, since the Kafka broker
>> itself keeps track of offsets (i am not mistaken). In other words, if we
>> redeploy the job, will it automatically resume from the last Kafka offset,
>> or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
>> correct offset recovery?
>>
>> Additionally, we have another job that uses a RabbitMQ source with
>> checkpoints enabled to manage manual acknowledgments. Does the same logic
>> apply in that case as well?
>>
>> Thanks in advance for any guidance!point enabled in order to activate
>> manual ack. Does this apply to this job also?
>>
>> Thanks in advance.
>>
>>
>> Best Regards.
>>
>


Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-18 Thread Ahmed Hamdy
Hi Mejri

> I’m wondering if this is strictly necessary, since the Kafka broker
itself keeps track of offsets (i am not mistaken). In other words, if we
redeploy the job, will it automatically resume from the last Kafka offset,
or should we still rely on Flink’s checkpoint/savepoint mechanism to ensure
correct offset recovery?

This depends on the starting offset you set in the source config[1]. you
can configure it to start from earliest or last committed or latest or at
specific offset.

I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack read
messages unlike Kafka.


1-
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
Best Regards
Ahmed Hamdy


On Tue, 18 Mar 2025 at 22:20, mejri houssem 
wrote:

>
> Hello everyone,
>
> We have a stateless Flink job that uses a Kafka source with at-least-once
> guarantees. We’ve enabled checkpoints so that, in the event of a restart,
> Flink can restore from the last committed offset stored in a successful
> checkpoint. Now we’re considering enabling savepoints for our production
> deployment.
>
> I’m wondering if this is strictly necessary, since the Kafka broker itself
> keeps track of offsets (i am not mistaken). In other words, if we redeploy
> the job, will it automatically resume from the last Kafka offset, or should
> we still rely on Flink’s checkpoint/savepoint mechanism to ensure correct
> offset recovery?
>
> Additionally, we have another job that uses a RabbitMQ source with
> checkpoints enabled to manage manual acknowledgments. Does the same logic
> apply in that case as well?
>
> Thanks in advance for any guidance!point enabled in order to activate
> manual ack. Does this apply to this job also?
>
> Thanks in advance.
>
>
> Best Regards.
>