Looks like Gerard asked something along similar lines <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-td14102.html> just last month and that there is a JIRA <https://issues.apache.org/jira/browse/FLINK-4940> for official support for broadcast state. Looks like the ugly hack is the way to go for now.
On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > I am implementing a control stream. The stream communicates a global > configuration value for the whole job. It uses DataStream.broadcast() to > communicate this to all parallel operator instances. I would like to save > this value in state so that it can be recovered when the job > restarts/recovers. The control stream is not keyed, so the only option is > Operator state. > > I could implement this using the ListCheckpointed interface, returning > Collections.singletonList(configValue) from snapshotState. It is clear > what I'd need to do in restoreState in the case of scale in. If I include > a serial number in the config, and it receives multiple values on restore, > it can keep the config value with the largest serial number, indicating the > latest config. > > Alas, it is not clear what should happen on scale out, as some operator > instances will receive empty lists. > > It seems the other alternative is to use CheckpointedFunction, along with > union redistribution via getUnionListState, and then have each operator > instance select from the union list the config with the latest serial > number, of which they should be multiple copies. But this seem like an > ugly hack. > > > In addition, the documentation is unclear on the relationship and effect, > if any, of the maximum parallelism Flink job parameter on operator state, > where as it is much clearer on this regard as it related to keyed state via > key groups. > > > How are folks handling this use case, i.e. storing and restoring global > config values via Flink state? > >