[jira] [Updated] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antoine Philippot updated FLINK-7883: - Description: For a cancel with savepoint command, the JobManager trigger the cancel call once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start. A solution could be to stop fetching the source stream task before triggering the savepoint. I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method {{stopFetching}} that existant SourceFunction implementations could implement. We can add a {{stopFetchingSource}} property in {{CheckpointOptions}} class to pass the desired behaviour from {{JobManager.handleMessage(CancelJobWithSavepoint)}} to {{SourceStreamTask.triggerCheckpoint}} was: For a cancel with savepoint command, the JobManager trigger the cancel call once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start. A solution could be to stop fetching the source stream task before triggering the savepoint. I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method {{stopFetching}} that existant SourceFunction implementations could implement. We can add a {{stopFetchingSource` property in {{CheckpointProperties}} and {{CheckpointOptions}} class to pass the desired behaviour from {{JobManager.handleMessage(CancelJobWithSavepoint)}} to {{SourceStreamTask.triggerCheckpoint}} > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16214858#comment-16214858 ] Antoine Philippot commented on FLINK-7883: -- Hi [~Zentol], The {{StoppableFunction}} with {{stop()}} method is similar to {{cancel()}} method as it close the task/function. The desired behaviour with {{StoppableFetchingSourceFunction}} (the name can be changed if it lead to confusion) and the method {{stopFetching}} is to only stop fetching the source stream without stop or cancel the task/function. The main reason is to allow kafka offset commit once the savepoint is complete which is not possible if the function is stop/cancel before. > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241762#comment-16241762 ] Antoine Philippot commented on FLINK-7883: -- Hi [~aljoscha], is there any news about this subject, I will be interested to participate on this feature considering that we really need to not reprocess kafka messages for each job restart and we cannot afford to maintain our fork forever > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276539#comment-16276539 ] Antoine Philippot commented on FLINK-7883: -- ok [~aljoscha], thanks for your feedback, it is far more general than just stop fetching source. I hope this feature will be available after the 1.5 release so that I do not have to reapply my patch to long. > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216917#comment-16216917 ] Antoine Philippot commented on FLINK-7883: -- Glad to see there are ideas and interest about this behaviour. I personnaly think that there are too many "close/shutdown/cancel/stop" commands / methods. There are {{cancel}} and {{stop}} commands for a job (it should only exist one clean shutdown command) And there are {{close}} on {{RichFunction}}, {{cancel}} on {{SourceFunction}} and also {{stop}} on {{StoppableFunction}}. 2 steps clean function shutdown should be available on both consumer and producer. Why not reuse {{RichFunction}} and {{AbstractRichFunction}} which already have a {{close}} function and add a new method {{prepareShutdown}} (I don't know how to name it) which return a Future which replace {{SourceFunction.cancel}} and {{StoppableFunction.stop}} When a job shutdown is triggered, we could first call all {{prepareShutdown}} methods and wait completion with configurable timeout. Only after, we could trigger a savepoint if requested and finally call all {{close}} methods. > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7883) Stop fetching source before a cancel with savepoint
Antoine Philippot created FLINK-7883: Summary: Stop fetching source before a cancel with savepoint Key: FLINK-7883 URL: https://issues.apache.org/jira/browse/FLINK-7883 Project: Flink Issue Type: Improvement Components: DataStream API, Kafka Connector, State Backends, Checkpointing Affects Versions: 1.3.2, 1.4.0 Reporter: Antoine Philippot For a cancel with savepoint command, the JobManager trigger the cancel call once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start. A solution could be to stop fetching the source stream task before triggering the savepoint. I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method {{stopFetching}} that existant SourceFunction implementations could implement. We can add a {{stopFetchingSource` property in {{CheckpointProperties}} and {{CheckpointOptions}} class to pass the desired behaviour from {{JobManager.handleMessage(CancelJobWithSavepoint)}} to {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212748#comment-16212748 ] Antoine Philippot commented on FLINK-7883: -- I made a commit to illustrate the solution https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6 If someone can validate the idea or comment it, I would be delighted to submit a pull request > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339101#comment-16339101 ] Antoine Philippot commented on FLINK-6756: -- Hi [~aljoscha] and [~spi-x-i] I have started to implement the Aljoscha idea. This is my current commit [https://github.com/apache/flink/commit/96684058d4a209d39d7fb1667beb7f083d215b58] My only concern is about override setRuntimeContext [https://github.com/apache/flink/commit/96684058d4a209d39d7fb1667beb7f083d215b58#diff-e79f7947628990ee5298af04bca1c399R17] compared to ScalaProcessWindowFunctionWrapper [https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala#L86] I don't understand the both call super.setRuntimeContext(t) and rfunc.setRuntimeContext(t) What do you think about that ? > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565552#comment-16565552 ] Antoine Philippot commented on FLINK-7883: -- Yes [~aljoscha], I'm agree, as long as the solution avoid duplicate reprocessing of events. My solution with StoppableFunction was the easiest way for me to achieve what I wanted ;) > Make savepoints atomic with respect to state and side effects > - > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.3.2, 1.4.0 >Reporter: Antoine Philippot >Priority: Major > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689543#comment-16689543 ] Antoine Philippot commented on FLINK-6756: -- Hi [~dawidwys], sorry but I'm not working on Flink anymore :( and I won't be able to reapply the patch on the current master. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)