[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-466629096 A new PR #7813 created to replace this one due to outdated code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-455623323 @StefanRRichter I have went through all production code using union state as below: For `ArtificalOperatorStateMapper`, `SimpleEndlessSourceWithBloatedState `, and `StateCreatingFlatMap`, they just use union state for end-to-end test verifying. For `FlinkKafkaProducer` and `FlinkKafkaProducer011`, the union `nextTransactionalIdHintState` is just the same for all sub-tasks. For `KafkaConsumer`, since Kafaka does support to decrease partition counts, the kafka partition is sticky to the subtask when parallslism not changed. I think above operators would not meet conflict case. For `SequenceGeneratorSource`, it would initialize state with max event time. However, `monotonousEventTime` only increase by fixed step of `eventTimeClockProgressPerEvent`, which should be the same for all sub-tasks. For `StreamingFileSink`, it also get the max counter, but from the defination of `StreamingFileSink`, when restoring from checkpoint, the restored files in `pending` state are transferred into `finished` state, while `in-progress` files are rolled back. In other words from my point, partial recovery is not suitable for `StreamingFileSink`. From my point of view, all operators in production code with union state, except `StreamingFileSink`, should work fine for partial restore. However, since the `getUnionList` API is public for users, we cannot control users' behavior. In a nutshell, if we support to restore state when using `RestartPipelinedRegionStrategy`, we should add limitation for union state. I plan to add another parameter, which might be `RecoveryMode`, in the `restoreLatestCheckpointedState` method. For `RestartAllStrategy` and overall only one region's `RestartPipelinedRegionStrategy`, it's `RecoveryMode.ALL`; for other failover strategies, it's `RecoveryMode.PARTIAL`. By means of this, when assigning state, if we found `RecoveryMode.PARTIAL` and union state existed, unsupported exception could be thrown out. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-454098941 @StefanRRichter Thanks for your explanation. I still have two questions below: 1. Even if we could assign partitioned operator state to all operator instances, the current `taskRestore` within `Execution` could only be shipped to taskmanagers if those executions located in the failed region. And instances that we keep running would not know operator states have changed. The possible bug "`This can lead to some partitions beeing assigned twice or not being assigned at all`" you mentioned is more likely on execution-graph side, and how we define the 'disunity' among tasks? 1. The last suggestion you provide seems a bit confused for me, please correct me if I am wrong, did you actually mean only round-robin assign operator state if parallelism **did change**? The parallelism could only be changed if job restarted, while it would not change during job fail-over. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-453846070 @StefanRRichter It seems union operator state, somehow, conflicts with partial recovery. However, since `region` means the minimal pipeline connected sub graph, why it could have union-state across different regions? Otherwise, we might need to introduce some limitations in this scenario. Would you please kindly help to clarify more clearly on this corner case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-449415836 @StefanRRichter Would you please take a look at the new commit? I really appreciate any help you can provide. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-443096765 @StefanRRichter Thanks for your comments, I would refactor this PR. BTW, I found region failover without letting checkpoint coordinator restart its `checkpointScheduler` would not guarantee `EXACTLY_ONCE` mechanism. I'll include this part of modification in next commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services