Hi Fabian,

Thanks for the prompt response and apologies for delayed response. 

You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say 
“best possible” recovery on “known" restarts either say manual cancel + start 
OR framework initiated ones like on operator failures with these constraints 
 - some data loss is ok
 - avoid periodic checkpoints as states are really transient (less than 5 
seconds of lifetime if not milliseconds) and almost all events make it to 
state. I do understand that checkpointing performance has drastically been 
improved and with async and RocksDB options, it should technically not add 
latency in application etc. However, I feel like even with improvements and 
local checkpointing (which we already are doing) it is a lot of “unused” 
IOPS/resource utilization especially if we start to spin up more apps handling 
similar data sources and with similar requirements. On a first blush it feels 
like those resources are better utilized in cluster for apps with stricter SLAs 
for data loss and recovery etc instead.

Basically, I suppose I am thinking Checkpointing feature that is initialized by 
certain actions / events rather than periodic ones. Let me know I am off-base 
here and I should just enable checkpointing in all of these apps and move on :) 

I tried Savepoint again and it looks like the issue is caused by the fact that 
Memory states are large as it is throwing error states are larger than certain 
size. So solution of (1) will possibly solve (2) as well. 

Thanks again,

Ashish


> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ashish,
> 
> Thanks for the great write up. 
> If I understood you correctly, there are two different issues that are caused 
> by the disabled checkpointing.
> 
> 1) Recovery from a failure without restarting all operators to preserve the 
> state in the running tasks
> 2) Planned restarts an application without losing all state (even with 
> disabled checkpointing).
> 
> Ad 1) The community is constantly working on reducing the time for 
> checkpointing and recovery. 
> For 1.5, local task recovery was added, which basically stores a state copy 
> on the local disk which is read in case of a recovery. So, tasks are 
> restarted but don't read the to restore state from distributed storage but 
> from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think this 
> might be an interesting option for you if it would be possible to write 
> checkpoints only to local disk and not remote storage. AFAIK, there are also 
> other efforts to reduce the number of restarted tasks in case of a failure. I 
> guess, you've played with other features such as RocksDBStateBackend, 
> incremental and async checkpoints already. 
> 
> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
> for. It would be good to know what exactly did not work for you. The 
> MemoryStateBackend is not suitable for large state sizes because it backups 
> into the heap memory of the JobManager. 
> 
> Best, Fabian
> 
> 2018-06-05 21:57 GMT+02:00 ashish pok <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>>:
> Fabian, Stephan, All,
> 
> I started a discussion a while back around having a form of event-based 
> checkpointing policy that will help us in some of our high volume data 
> pipelines. Here is an effort to put this in front of community and understand 
> what capabilities can support these type of use cases, how much others feel 
> the same need and potentially a feature that can make it to a user story.
> 
> Use Case Summary:
> - Extremely high volume of data (events from consumer devices with customer 
> base of over 100M)
> - Multiple events need to be combined using a windowing streaming app grouped 
> by keys (something like 5 min floor of timestamp and unique identifiers for 
> customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds 
> however events can sometimes delay or get lost in transport (so delayed event 
> handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more) 
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is 
> turned off (saves on writes to persistence as states/sessions are active for 
> only few seconds during processing)
> 
> Problem Summary:
> Of course, none of the above is out of the norm for Flink and as a matter of 
> factor we already have a Flink app doing this. The issue arises when it comes 
> to graceful shutdowns and on operator failures (eg: Kafka timeouts etc.) On 
> operator failures, entire job graph restarts which essentially flushes out 
> in-memory states/sessions. I think there is a feature in works (not sure if 
> it made it to 1.5) to perform selective restarts which will control the 
> damage but still will result in data loss. Also, it doesn't help when 
> application restarts are needed. We did try going savepoint route for 
> explicit restart needs but I think MemoryBackedState ran into issues for 
> larger states or something along those line(not certain). We obviously cannot 
> recover an operator that actually fails because it's own state could be 
> unrecoverable. However, it feels like Flink already has a lot of plumbing to 
> help with overall problem of allowing some sort of recoverable state to 
> handle graceful shutdowns and restarts with minimal data loss.
> 
> Solutions:
> Some in community commented on my last email with decent ideas like having an 
> event-based checkpointing trigger (on shutdown, on restart etc) or life-cycle 
> hooks (onCancel, onRestart etc) in Functions that can be implemented if this 
> type of behavior is needed etc. 
> 
> Appreciate feedback from community on how useful this might be for others and 
> from core contributors on their thoughts as well.
> 
> Thanks in advance, Ashish
> 
> 

Reply via email to