I am implementing a control stream.  The stream communicates a global
configuration value for the whole job.  It uses DataStream.broadcast() to
communicate this to all parallel operator instances.  I would like to save
this value in state so that it can be recovered when the job
restarts/recovers.  The control stream is not keyed, so the only option is
Operator state.

I could implement this using the ListCheckpointed interface, returning
Collections.singletonList(configValue) from snapshotState.  It is clear
what I'd need to do in restoreState in the case of scale in.  If I include
a serial number in the config, and it receives multiple values on restore,
it can keep the config value with the largest serial number, indicating the
latest config.

Alas, it is not clear what should happen on scale out, as some operator
instances will receive empty lists.

It seems the other alternative is to use CheckpointedFunction, along with
union redistribution via getUnionListState, and then have each operator
instance select from the union list the config with the latest serial
number, of which they should be multiple copies.  But this seem like an
ugly hack.


In addition, the documentation is unclear on the relationship and effect,
if any, of the maximum parallelism Flink job parameter on operator state,
where as it is much clearer on this regard as it related to keyed state via
key groups.


How are folks handling this use case, i.e. storing and restoring global
config values via Flink state?

Reply via email to