[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701453#comment-16701453 ] ASF GitHub Bot commented on FLINK-9574: --- asfgit closed pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md index f3941bc0c98..4de9462a49b 100644 --- a/docs/dev/stream/state/custom_serialization.md +++ b/docs/dev/stream/state/custom_serialization.md @@ -1,8 +1,8 @@ --- title: "Custom Serialization for Managed State" -nav-title: "Custom Serialization" +nav-title: "Custom State Serialization" nav-parent_id: streaming_state -nav-pos: 6 +nav-pos: 7 --- -If your application uses Flink's managed state, it might be necessary to implement custom serialization logic for special use cases. +* ToC +{:toc} -This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how -to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using -Flink's own serializers, this page is irrelevant and can be skipped. +This page is targeted as a guideline for users who require the use of custom serialization for their state, covering +how to provide a custom state serializer as well as guidelines and best practices for implementing serializers that allow +state schema evolution. -### Using custom serializers +If you're simply using Flink's own serializers, this page is irrelevant and can be ignored. -As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +## Using custom state serializers + +When registering a managed operator or keyed state, a `StateDescriptor` is required to specify the state's name, as well as information about the type of the state. The type information is used by Flink's [type serialization framework](../../types_serialization.html) to create appropriate serializers for the state. @@ -66,125 +69,169 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701454#comment-16701454 ] ASF GitHub Bot commented on FLINK-9574: --- tzulitai commented on issue #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#issuecomment-442338087 Thanks a lot for the detailed review @igalshilman @dawidwys @alpinegizmo! I've addressed your comments and merged this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700744#comment-16700744 ] ASF GitHub Bot commented on FLINK-9574: --- tzulitai commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r236764367 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: Review comment: I think "allowing" is not the correct wording here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700742#comment-16700742 ] ASF GitHub Bot commented on FLINK-9574: --- tzulitai commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r236763803 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: Review comment: This should be more a tone of: The user doesn't want Flink to infer a serializer, instead use their own custom serializer This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694789#comment-16694789 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235406215 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694795#comment-16694795 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235409667 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694793#comment-16694793 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235407289 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694788#comment-16694788 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235403113 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, Review comment: ```suggestion When restoring from savepoints, Flink allows changing the serializers used to read and write previously registered state ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is >
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694791#comment-16694791 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235412250 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: Review comment: Is this saying that Flink is able to infer the type information because the provided state descriptor is not configured to use a specific `TypeSerializer` or `TypeInformation`, or that configuring it in that way would have permitted Flink to infer the type information? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694794#comment-16694794 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235407061 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694787#comment-16694787 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235406750 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694790#comment-16694790 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235402519 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -23,15 +23,18 @@ specific language governing permissions and limitations under the License. --> -If your application uses Flink's managed state, it might be necessary to implement custom serialization logic for special use cases. +* ToC +{:toc} This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how -to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using -Flink's own serializers, this page is irrelevant and can be skipped. +to provide a custom state serializer and guidelines and best practices to implementing serializers that allows Review comment: ```suggestion how to provide a custom state serializer as well as guidelines and best practices for implementing serializers that allow ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694792#comment-16694792 ] ASF GitHub Bot commented on FLINK-9574: --- alpinegizmo commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235414035 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, Review comment: I suggest changing all instances of "prior registered state" to "previously registered state", and "prior serializer" to "previous serializer". I find this wording more natural. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is >
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694417#comment-16694417 ] ASF GitHub Bot commented on FLINK-9574: --- dawidwys commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235294163 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be used
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694414#comment-16694414 ] ASF GitHub Bot commented on FLINK-9574: --- dawidwys commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235298883 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be used
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694418#comment-16694418 ] ASF GitHub Bot commented on FLINK-9574: --- dawidwys commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235299684 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be used
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694413#comment-16694413 ] ASF GitHub Bot commented on FLINK-9574: --- dawidwys commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235296908 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: + + +{% highlight java %} +ListStateDescriptor descriptor = +new ListStateDescriptor<>( +"state-name", +MyPojoType.class); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + + +Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write +persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly +supports it. This is handled transparently by serializers generated by Flink's type serialization framework +(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution#supported-data-types-for-schema-evolution)). + +If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement +the serializer to support state schema evolution, please refer to +[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization). +The documentation there also covers necessary internal details about the interplay between state serializers and Flink's +state backends to support state schema evolution. + +## Evolving state schema Review comment: How about adding a very high level description of how does the migration happens? Something along the lines that old serializer is used to read state, new serializer is used to write state with new schema. I would also mention here already that the compatibility check between those serializers is performed. Then I would link to more detailed description for particular state backends. I guess that you left it out, because you intended this page to be for "basic/simple" users, but I think if somebody ended up looking in for state migration he/she already has some deeper knowledge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694415#comment-16694415 ] ASF GitHub Bot commented on FLINK-9574: --- dawidwys commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r235290652 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview Review comment: I would leave out the `Overview` heading. I think the pattern is that the first section with toc is the overview of a page. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693185#comment-16693185 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234964879 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: + + +{% highlight java %} +ListStateDescriptor descriptor = +new ListStateDescriptor<>( +"state-name", +MyPojoType.class); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + + +Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write +persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly +supports it. This is handled transparently by serializers generated by Flink's type serialization framework +(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution#supported-data-types-for-schema-evolution)). + +If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement +the serializer to support state schema evolution, please refer to +[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization). +The documentation there also covers necessary internal details about the interplay between state serializers and Flink's +state backends to support state schema evolution. + +## Evolving state schema + +To evolve the schema of a given state type, you would take the following steps: + + 1. Take a savepoint of your Flink streaming job. + 2. Update state types in your application (e.g., modifying your Avro / POJO type schema). + 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not + the schema had been changed for the state, and migrate state schema if necessary. + +The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. +Further details about the migration process is out of the scope of this documentation; please refer to +[here]({{ site.baseurl }}/dev/stream/state/custom_serialization). + +## Supported data types for schema evolution + +Currently, schema evolution is supported only for Avro. Therefore, if you care about schema evolution for +state, it is currently recommended to always use Avro for state data types. + +There are plans to extend the support for more composite types, such as POJOs; for more details, +please refer to [FLINK-10897](https://issues.apache.org/jira/browse/FLINK-10897). + +### Avro types + +Flink fully supports evolving schema of Avro type state, as long as the schema change is considered compatible by +[Avro's rules for schema resolution](http://avro.apache.org/docs/current/spec.html#Schema+Resolution). + +Moreover, it is possible on restore to switch from using Avro-generated `SpecificRecord`s to `GenericRecord`s, Review comment: Would it make sense to mention the limitation here? For example, you can't change the namespace, or relocated the Avro generated classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL:
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693191#comment-16693191 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234963185 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: + + +{% highlight java %} +ListStateDescriptor descriptor = +new ListStateDescriptor<>( +"state-name", +MyPojoType.class); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + + +Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write +persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly +supports it. This is handled transparently by serializers generated by Flink's type serialization framework +(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution#supported-data-types-for-schema-evolution)). + +If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement +the serializer to support state schema evolution, please refer to +[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization). +The documentation there also covers necessary internal details about the interplay between state serializers and Flink's +state backends to support state schema evolution. + +## Evolving state schema + +To evolve the schema of a given state type, you would take the following steps: + + 1. Take a savepoint of your Flink streaming job. + 2. Update state types in your application (e.g., modifying your Avro / POJO type schema). + 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not + the schema had been changed for the state, and migrate state schema if necessary. + +The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. +Further details about the migration process is out of the scope of this documentation; please refer to +[here]({{ site.baseurl }}/dev/stream/state/custom_serialization). + +## Supported data types for schema evolution + +Currently, schema evolution is supported only for Avro. Therefore, if you care about schema evolution for +state, it is currently recommended to always use Avro for state data types. + +There are plans to extend the support for more composite types, such as POJOs; for more details, +please refer to [FLINK-10897](https://issues.apache.org/jira/browse/FLINK-10897). + +### Avro types + +Flink fully supports evolving schema of Avro type state, as long as the schema change is considered compatible by +[Avro's rules for schema resolution](http://avro.apache.org/docs/current/spec.html#Schema+Resolution). + +Moreover, it is possible on restore to switch from using Avro-generated `SpecificRecord`s to `GenericRecord`s, Review comment: I'd either drop the last line, or at least would mention that it would work as long as an Avro generated specific record won't be found in the classpath, but I'm more inclined to drop it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > >
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693186#comment-16693186 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234973231 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693189#comment-16693189 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234981861 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693190#comment-16693190 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r23492 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is Review comment: suggestion: ...only if you are using state serializers that are provided by Flink's own ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693187#comment-16693187 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234974424 ## File path: docs/dev/stream/state/custom_serialization.md ## @@ -66,125 +69,166 @@ checkpointedState = getRuntimeContext.getListState(descriptor) -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. +## State serializers and schema evolution -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. +This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary +internal details about how Flink interacts with these abstractions. -The following subsections illustrate guidelines to implement these two methods when using custom serializers. +When restoring from savepoints, Flink allows changing the serializers used to read and write prior registered state, +so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be +registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the +restored job). This new serializer may have a different schema than that of the prior serializer. Therefore, when +implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in +mind is how the serialization schema can be changed in the future. - Implementing the `snapshotConfiguration` method +When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state +type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693188#comment-16693188 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234956989 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). + +Note that the information on this page is relevant only if you are using state serializers that is +generated by Flink's own [type serialization framework]({{ site.baseurl }}/dev/types_serialization.html). +That is, when declaring your state, the provided state descriptor is not configured to use a specific `TypeSerializer` +or `TypeInformation`, and therefore allowing Flink to infer information about the state type: + + +{% highlight java %} +ListStateDescriptor descriptor = +new ListStateDescriptor<>( +"state-name", +MyPojoType.class); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + + +Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write +persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly +supports it. This is handled transparently by serializers generated by Flink's type serialization framework +(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution#supported-data-types-for-schema-evolution)). + +If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement +the serializer to support state schema evolution, please refer to +[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization). +The documentation there also covers necessary internal details about the interplay between state serializers and Flink's +state backends to support state schema evolution. + +## Evolving state schema + +To evolve the schema of a given state type, you would take the following steps: + + 1. Take a savepoint of your Flink streaming job. + 2. Update state types in your application (e.g., modifying your Avro / POJO type schema). Review comment: Would it make sense to drop Pojo type schema from here since we don't support it yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693184#comment-16693184 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234954603 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. Review comment: I guess you wanted to say: Run indefinitely **or** for long periods of time This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693183#comment-16693183 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234955381 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. +As with all long-running services, the applications need to be updated to adapt to changing requirements. +This goes the same for data schemas that the applications work against; they evolve along with the application. + +This page provides an overview of how you can evolve your state type's data schema. +The current restrictions varies across different type's and state structures (et.c `ValueState`, `ListState`, etc.). Review comment: typo: - type's -> types - probably "et.c" doesn't belong here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693041#comment-16693041 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234953962 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. Review comment: run indefinitely or for long periods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693040#comment-16693040 ] ASF GitHub Bot commented on FLINK-9574: --- igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124#discussion_r234953962 ## File path: docs/dev/stream/state/schema_evolution.md ## @@ -0,0 +1,92 @@ +--- +title: "State Schema Evolution" +nav-parent_id: streaming_state +nav-pos: 6 +--- + + +* ToC +{:toc} + +## Overview + +Apache Flink streaming applications are typically designed to run indefinitely for long periods of time. Review comment: run indefinitely or for long periods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9574) Add a dedicated documentation page for state evolution
[ https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689502#comment-16689502 ] ASF GitHub Bot commented on FLINK-9574: --- tzulitai opened a new pull request #7124: [FLINK-9574] [doc] Rework documentation for custom state serializers and state evolution URL: https://github.com/apache/flink/pull/7124 ## What is the purpose of the change This PR adds a new "State Schema Evolution" page under "State & Fault Tolerance". It also reworks the "Custom State Serialization" page to reflect the new serializer / serializer snapshot abstractions in 1.7. - The "State Schema Evolution" page is intended for the majority of users who do not use custom serializers, and just care about what state types they should use if they care about evolvable schema, and their limitations. The list of supported types only includes Avro now, because we only support Avro schema evolution in 1.7. - The "Custom State Serialization" page is intended for power users who implement their own state serializer. It explains the abstractions and how Flink interacts with them. The document is also targeted for Flink developers who might implement Flink-shipped serializers. ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a dedicated documentation page for state evolution > -- > > Key: FLINK-9574 > URL: https://issues.apache.org/jira/browse/FLINK-9574 > Project: Flink > Issue Type: Sub-task > Components: Documentation, State Backends, Checkpointing, Type > Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the only bit of documentation about serializer upgrades / state > evolution, is > [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.], > which only explains things at an API level. > State evolution over the time has proved to be a rather complex topic that is > often overlooked by users. Users would probably benefit from a actual > full-grown dedicated page that covers both API, some necessary internal > details regarding interplay of state serializers, best practices, and caution > notices. > I propose to add this documentation as a subpage under Streaming/State & > Fault-Tolerance/. -- This message was sent by Atlassian JIRA (v7.6.3#76005)