asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index 44a3653a61f..72fca3e27f1 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -266,6 +266,132 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state of any type. +In this case it will expire after the configured TTL +and its stored value will be cleaned up on the best effort basis which is discussed in details later. + +The state collection types support per-entry TTLs: list elements and map entries expire independently. + +To use state TTL you must first build a `StateTtlConfig` object, +then TTL functionality can be enabled in any state descriptor passing this configuration: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.state.ValueStateDescriptor; + +StateTtlConfig ttlConfig = StateTtlConfig + .newBuilder(Time.seconds(1)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); + +ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.common.state.StateTtlConfig +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.state.ValueStateDescriptor + +val ttlConfig = StateTtlConfig + .newBuilder(Time.seconds(1)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build + +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} +</div> +</div> + +The configuration has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is the time-to-live value. + +The update type configures when the state TTL is refreshed (default `OnCreateAndWrite`): + + - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + + In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore, + even if it has yet to be removed. The option can be useful for the use cases + where data has to become unavailable for read access strictly after TTL, + e.g. application working with privacy sensitive data. + +Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its cleanup. + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. +Heap state backend stores an additional Java object with a reference to the user state object +and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry. + +- Only TTLs in reference to *processing time* are currently supported. + +- Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa +will lead to compatibility failure and `StateMigrationException`. + +- The TTL configuration is not part of check- or savepoints +but rather a way how Flink treats it in the currently running job. + +#### Cleanup of expired state + +Currently expired values are always removed when they are read out explicitly, +e.g. by calling `ValueState.value()`. + +<span class="label label-danger">Attention!</span> This means that by default if expired state is not read, +it won't be removed, possibly leading to ever growing state. This might change in future releases. + +Additionally you can activate the cleanup at the moment of taking the full state snapshot which +will reduce its size. The local state is not cleaned up under current implementation +but it will not include the removed expired state in case of restoration from the previous snapshot. +It can be configured in `StateTtlConfig`: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; + +StateTtlConfig ttlConfig = StateTtlConfig + .newBuilder(Time.seconds(1)) + .cleanupFullSnapshot() + .build(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.common.state.StateTtlConfig +import org.apache.flink.api.common.time.Time + +val ttlConfig = StateTtlConfig + .newBuilder(Time.seconds(1)) + .cleanupFullSnapshot + .build +{% endhighlight %} +</div> +</div> + +This option is not applicable for the incremental checkpointing in the RocksDB state backend. + +More strategies will be added in future that clean up expired state automatically in the background. + ### State in the Scala DataStream API In addition to the interface described above, the Scala API has shortcuts for stateful ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services