Could you write two different implementations of the DoFn and put your
processing logic in another function that both DoFn's would invoke after
doing any accessing of the state?

Then during pipeline construction you could choose to apply the Map one or
the Value one based upon which runner your using.



On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <[email protected]> wrote:

> Hi everyone,
>
> Since Dataflow doesn't support MapState (
> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
> ValueState with a Map<> inside it. Is it a good idea? Here is an example
> code:
> ```
> @StateId("myValueStore")
> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec =
> StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>
> @ProcessElement
> public void processElement( ProcessContext
> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>     ...
> }
> ```
>
> I'd like to support other runners as well (e.g. FlinkRunner) and it seems
> to be more efficient to use MapState in cases where I need to store a map
> of values. So I'm thinking of the way to use MapState and ValueState for
> different runners.
>
> I understand how to get a runner in runtime via pipeline options but I'm
> not sure how to approach defining (and using) different StateSpec for
> different runners.
>
> Here is a sample code for MapState:
> ```
> @StateId("myMapStore")
> private final StateSpec<MapState<String, String>> mapStoreSpec =
> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>
> @ProcessElement
> public void processElement( ProcessContext context,
>         @StateId("myMapStore") MapState<String, String> mapStore) {
>     ...
> }
> ```
>
> Any ideas?
>
> Thank you,
> Dmitry
>

Reply via email to