Re: Flink Savepoint fault tolerance

2021-04-21 Thread Arvid Heise
Just to add. You can also change parallelism from checkpoints (it's usually
much faster than using savepoints). For that, you want to use external
checkpoints that are retained after job completion.

But savepoints are the way to go for any topology changes, version updates,
etc.

On Wed, Apr 21, 2021 at 4:19 PM dhanesh arole 
wrote:

> Hi Arvid,
>
> Thanks for taking time to answer this. Yeah, we are also using save points
> as only restore mechanism If job parallelism needs to be changed or some
> job graph properties need to be updated. Otherwise during other rolling
> deployments of task manager pods or job manager pods we solely rely on
> previously completed checkpoints.
>
>
> On Wed, 21 Apr 2021 at 15:05 Arvid Heise  wrote:
>
>> Hi Dhanesh,
>>
>> We recommend to use savepoints only for migrations, investigations, A/B
>> testing, and time travel and rely completely on checkpoints for fault
>> tolerance. Are you using it differently?
>>
>> Currently, we are triggering savepoints using REST apis. And query the
>>> status of savepoint by the returned handle. In case there is a network
>>> issue because of which we couldn't receive response then in that case how
>>> to find out if the savepoint in the previous request was triggered or not?
>>> Is there a way to add "idempotency-key" to each API request so that we can
>>> safely retry triggering savepoint? By doing this, we want to avoid multiple
>>> triggers of consecutive savepoints during job upgrades.
>>>
>> I think you'd have to use your logging system and have a metric/trigger
>> on the respective line. I don't think there is any REST API for that.
>>
>> Our workflow for capturing savepoint looks like this - call POST
>>> /savepoint endpoint. Use the returned trigger handle to periodically poll
>>> the status of savepoint. Once the savepoint is completed then restore the
>>> job from that savepoint. We are running our flink clusters in k8s. Since
>>> pod IPs can get restarted / migrated quite often in k8s, it's possible that
>>> the JM pod that was used to capture the savepoint happens to be recycled
>>> before completion of savepoint. In that case, we can't query the status of
>>> triggered savepoint from the previously returned handle. As neither the
>>> newly created JM pod or any other standby JMs have information about this
>>> savepoint. I couldn't find any config that makes Flink persist state of
>>> ongoing savepoints to an external store which will allow users to query the
>>> status of savepoint via any available JM instance in HA setup.
>>>
>> Not an expert on K8s but couldn't you expose the JM as a K8s service.
>> That should follow the migration automatically.
>>
>> If one of the TMs crashes during ongoing checkpoint then I believe that
>>> checkpoint is marked as failed and on the next checkpoint interval Flink
>>> triggers a new checkpoint by looking at the previously completed checkpoint
>>> counter. The next checkpoint attempt might get acknowledged by all
>>> operators and marked as completed. Is that correct? In case of savepoints
>>> this is not possible. So how does flink resume the savepoint capturing
>>> process in case of job restarts or TM failures?
>>>
>> Savepoints have to be triggered anew. Savepoints are meant as a purely
>> manual feature. Again, you could automate it, if you look at the logs.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Fri, Apr 16, 2021 at 12:33 PM dhanesh arole 
>> wrote:
>>
>>> Hello all,
>>>
>>> I had 2 questions regarding savepoint fault tolerance.
>>>
>>> Job manager restart:
>>>
>>>- Currently, we are triggering savepoints using REST apis. And query
>>>the status of savepoint by the returned handle. In case there is a 
>>> network
>>>issue because of which we couldn't receive response then in that case how
>>>to find out if the savepoint in the previous request was triggered or 
>>> not?
>>>Is there a way to add "idempotency-key" to each API request so that we 
>>> can
>>>safely retry triggering savepoint? By doing this, we want to avoid 
>>> multiple
>>>triggers of consecutive savepoints during job upgrades.
>>>- Our workflow for capturing savepoint looks like this - call POST
>>>/savepoint endpoint. Use the returned trigger handle to periodically poll
>>>the status of savepoint. Once the savepoint is completed then restore the
>>>job from that savepoint. We are running our flink clusters in k8s. Since
>>>pod IPs can get restarted / migrated quite often in k8s, it's possible 
>>> that
>>>the JM pod that was used to capture the savepoint happens to be recycled
>>>before completion of savepoint. In that case, we can't query the status 
>>> of
>>>triggered savepoint from the previously returned handle. As neither the
>>>newly created JM pod or any other standby JMs have information about this
>>>savepoint. I couldn't find any config that makes Flink persist state of
>>>ongoing savepoints to an external store which will allow users to 

Re: Flink Savepoint fault tolerance

2021-04-21 Thread dhanesh arole
Hi Arvid,

Thanks for taking time to answer this. Yeah, we are also using save points
as only restore mechanism If job parallelism needs to be changed or some
job graph properties need to be updated. Otherwise during other rolling
deployments of task manager pods or job manager pods we solely rely on
previously completed checkpoints.


On Wed, 21 Apr 2021 at 15:05 Arvid Heise  wrote:

> Hi Dhanesh,
>
> We recommend to use savepoints only for migrations, investigations, A/B
> testing, and time travel and rely completely on checkpoints for fault
> tolerance. Are you using it differently?
>
> Currently, we are triggering savepoints using REST apis. And query the
>> status of savepoint by the returned handle. In case there is a network
>> issue because of which we couldn't receive response then in that case how
>> to find out if the savepoint in the previous request was triggered or not?
>> Is there a way to add "idempotency-key" to each API request so that we can
>> safely retry triggering savepoint? By doing this, we want to avoid multiple
>> triggers of consecutive savepoints during job upgrades.
>>
> I think you'd have to use your logging system and have a metric/trigger on
> the respective line. I don't think there is any REST API for that.
>
> Our workflow for capturing savepoint looks like this - call POST
>> /savepoint endpoint. Use the returned trigger handle to periodically poll
>> the status of savepoint. Once the savepoint is completed then restore the
>> job from that savepoint. We are running our flink clusters in k8s. Since
>> pod IPs can get restarted / migrated quite often in k8s, it's possible that
>> the JM pod that was used to capture the savepoint happens to be recycled
>> before completion of savepoint. In that case, we can't query the status of
>> triggered savepoint from the previously returned handle. As neither the
>> newly created JM pod or any other standby JMs have information about this
>> savepoint. I couldn't find any config that makes Flink persist state of
>> ongoing savepoints to an external store which will allow users to query the
>> status of savepoint via any available JM instance in HA setup.
>>
> Not an expert on K8s but couldn't you expose the JM as a K8s service. That
> should follow the migration automatically.
>
> If one of the TMs crashes during ongoing checkpoint then I believe that
>> checkpoint is marked as failed and on the next checkpoint interval Flink
>> triggers a new checkpoint by looking at the previously completed checkpoint
>> counter. The next checkpoint attempt might get acknowledged by all
>> operators and marked as completed. Is that correct? In case of savepoints
>> this is not possible. So how does flink resume the savepoint capturing
>> process in case of job restarts or TM failures?
>>
> Savepoints have to be triggered anew. Savepoints are meant as a purely
> manual feature. Again, you could automate it, if you look at the logs.
>
> Best,
>
> Arvid
>
>
> On Fri, Apr 16, 2021 at 12:33 PM dhanesh arole 
> wrote:
>
>> Hello all,
>>
>> I had 2 questions regarding savepoint fault tolerance.
>>
>> Job manager restart:
>>
>>- Currently, we are triggering savepoints using REST apis. And query
>>the status of savepoint by the returned handle. In case there is a network
>>issue because of which we couldn't receive response then in that case how
>>to find out if the savepoint in the previous request was triggered or not?
>>Is there a way to add "idempotency-key" to each API request so that we can
>>safely retry triggering savepoint? By doing this, we want to avoid 
>> multiple
>>triggers of consecutive savepoints during job upgrades.
>>- Our workflow for capturing savepoint looks like this - call POST
>>/savepoint endpoint. Use the returned trigger handle to periodically poll
>>the status of savepoint. Once the savepoint is completed then restore the
>>job from that savepoint. We are running our flink clusters in k8s. Since
>>pod IPs can get restarted / migrated quite often in k8s, it's possible 
>> that
>>the JM pod that was used to capture the savepoint happens to be recycled
>>before completion of savepoint. In that case, we can't query the status of
>>triggered savepoint from the previously returned handle. As neither the
>>newly created JM pod or any other standby JMs have information about this
>>savepoint. I couldn't find any config that makes Flink persist state of
>>ongoing savepoints to an external store which will allow users to query 
>> the
>>status of savepoint via any available JM instance in HA setup.
>>
>>
>> Task manager restart:
>>
>>- If one of the TMs crashes during ongoing checkpoint then I believe
>>that checkpoint is marked as failed and on the next checkpoint interval
>>Flink triggers a new checkpoint by looking at the previously completed
>>checkpoint counter. The next checkpoint attempt might get acknowledged by
>>all operators and 

Re: Flink Savepoint fault tolerance

2021-04-21 Thread Arvid Heise
Hi Dhanesh,

We recommend to use savepoints only for migrations, investigations, A/B
testing, and time travel and rely completely on checkpoints for fault
tolerance. Are you using it differently?

Currently, we are triggering savepoints using REST apis. And query the
> status of savepoint by the returned handle. In case there is a network
> issue because of which we couldn't receive response then in that case how
> to find out if the savepoint in the previous request was triggered or not?
> Is there a way to add "idempotency-key" to each API request so that we can
> safely retry triggering savepoint? By doing this, we want to avoid multiple
> triggers of consecutive savepoints during job upgrades.
>
I think you'd have to use your logging system and have a metric/trigger on
the respective line. I don't think there is any REST API for that.

Our workflow for capturing savepoint looks like this - call POST /savepoint
> endpoint. Use the returned trigger handle to periodically poll the status
> of savepoint. Once the savepoint is completed then restore the job from
> that savepoint. We are running our flink clusters in k8s. Since pod IPs can
> get restarted / migrated quite often in k8s, it's possible that the JM pod
> that was used to capture the savepoint happens to be recycled before
> completion of savepoint. In that case, we can't query the status of
> triggered savepoint from the previously returned handle. As neither the
> newly created JM pod or any other standby JMs have information about this
> savepoint. I couldn't find any config that makes Flink persist state of
> ongoing savepoints to an external store which will allow users to query the
> status of savepoint via any available JM instance in HA setup.
>
Not an expert on K8s but couldn't you expose the JM as a K8s service. That
should follow the migration automatically.

If one of the TMs crashes during ongoing checkpoint then I believe that
> checkpoint is marked as failed and on the next checkpoint interval Flink
> triggers a new checkpoint by looking at the previously completed checkpoint
> counter. The next checkpoint attempt might get acknowledged by all
> operators and marked as completed. Is that correct? In case of savepoints
> this is not possible. So how does flink resume the savepoint capturing
> process in case of job restarts or TM failures?
>
Savepoints have to be triggered anew. Savepoints are meant as a purely
manual feature. Again, you could automate it, if you look at the logs.

Best,

Arvid


On Fri, Apr 16, 2021 at 12:33 PM dhanesh arole 
wrote:

> Hello all,
>
> I had 2 questions regarding savepoint fault tolerance.
>
> Job manager restart:
>
>- Currently, we are triggering savepoints using REST apis. And query
>the status of savepoint by the returned handle. In case there is a network
>issue because of which we couldn't receive response then in that case how
>to find out if the savepoint in the previous request was triggered or not?
>Is there a way to add "idempotency-key" to each API request so that we can
>safely retry triggering savepoint? By doing this, we want to avoid multiple
>triggers of consecutive savepoints during job upgrades.
>- Our workflow for capturing savepoint looks like this - call POST
>/savepoint endpoint. Use the returned trigger handle to periodically poll
>the status of savepoint. Once the savepoint is completed then restore the
>job from that savepoint. We are running our flink clusters in k8s. Since
>pod IPs can get restarted / migrated quite often in k8s, it's possible that
>the JM pod that was used to capture the savepoint happens to be recycled
>before completion of savepoint. In that case, we can't query the status of
>triggered savepoint from the previously returned handle. As neither the
>newly created JM pod or any other standby JMs have information about this
>savepoint. I couldn't find any config that makes Flink persist state of
>ongoing savepoints to an external store which will allow users to query the
>status of savepoint via any available JM instance in HA setup.
>
>
> Task manager restart:
>
>- If one of the TMs crashes during ongoing checkpoint then I believe
>that checkpoint is marked as failed and on the next checkpoint interval
>Flink triggers a new checkpoint by looking at the previously completed
>checkpoint counter. The next checkpoint attempt might get acknowledged by
>all operators and marked as completed. Is that correct? In case of
>savepoints this is not possible. So how does flink resume the savepoint
>capturing process in case of job restarts or TM failures?
>- I am sure this must be already handled but just wanted to confirm
>and get help in finding relevant code references for this so I can dig
>deeper for understanding it in depth from an educational point of view.
>
>
> -
> Dhanesh Arole ( Sent from mobile device. Pardon me for typos )
>
>


Flink Savepoint fault tolerance

2021-04-16 Thread dhanesh arole
Hello all,

I had 2 questions regarding savepoint fault tolerance.

Job manager restart:

   - Currently, we are triggering savepoints using REST apis. And query the
   status of savepoint by the returned handle. In case there is a network
   issue because of which we couldn't receive response then in that case how
   to find out if the savepoint in the previous request was triggered or not?
   Is there a way to add "idempotency-key" to each API request so that we can
   safely retry triggering savepoint? By doing this, we want to avoid multiple
   triggers of consecutive savepoints during job upgrades.
   - Our workflow for capturing savepoint looks like this - call POST
   /savepoint endpoint. Use the returned trigger handle to periodically poll
   the status of savepoint. Once the savepoint is completed then restore the
   job from that savepoint. We are running our flink clusters in k8s. Since
   pod IPs can get restarted / migrated quite often in k8s, it's possible that
   the JM pod that was used to capture the savepoint happens to be recycled
   before completion of savepoint. In that case, we can't query the status of
   triggered savepoint from the previously returned handle. As neither the
   newly created JM pod or any other standby JMs have information about this
   savepoint. I couldn't find any config that makes Flink persist state of
   ongoing savepoints to an external store which will allow users to query the
   status of savepoint via any available JM instance in HA setup.


Task manager restart:

   - If one of the TMs crashes during ongoing checkpoint then I believe
   that checkpoint is marked as failed and on the next checkpoint interval
   Flink triggers a new checkpoint by looking at the previously completed
   checkpoint counter. The next checkpoint attempt might get acknowledged by
   all operators and marked as completed. Is that correct? In case of
   savepoints this is not possible. So how does flink resume the savepoint
   capturing process in case of job restarts or TM failures?
   - I am sure this must be already handled but just wanted to confirm and
   get help in finding relevant code references for this so I can dig deeper
   for understanding it in depth from an educational point of view.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )