[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815274#comment-17815274 ] Stefan Richter commented on FLINK-33962: Hi [~Zhanghao Chen] ! The proposed change in this Jira makes sense to me, and I think that using the same idea as outlined in the FLIP should also work for this case. From the top of my head I don't see a problem to revive the previous mechanism for compatibility. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815195#comment-17815195 ] Zhanghao Chen commented on FLINK-33962: --- Hi [~srichter], I'm trying to make the hasher upgrade backwards-compatible by reviving the idea of FLINK-5290 authored by you back in Flink 1.2. The details can be found in the linked FLIP doc. Could you kindly help take a review? Thanks a lot in advance. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801984#comment-17801984 ] Zhanghao Chen commented on FLINK-33962: --- Thanks for the quick response, I'll draft a FLIP and raise discussion first. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801976#comment-17801976 ] Xintong Song commented on FLINK-33962: -- Thanks for reaching out, [~Zhanghao Chen]. Just some quick responses, I would need to look a bit more into the related components before giving further comments. Based on your description, in general I think it makes sense to make operator id generation independent from chaining. However, as you have already mentioned, this is a breaking change that may result in state incompatibility. Therefore, I think it deserves a FLIP discussion and an official vote. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801746#comment-17801746 ] Zhanghao Chen commented on FLINK-33962: --- Hi [~xtsong], reaching out to you as I think it'll be good to have it in the Flink 2.0 roadmap as well, please instruct me if there's anything else that need to be done to put it in the roadmap. The current chianing-aware mechanism seems to be a tech debt resulted from pre-1.3.x series where Flink does not support operator-level state recovery within an operator-chain. It's no longer an issue after v1.3, but users have been suffering from this since then. It's time to make a change. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)