If you are mutating accumulators, perhaps you might blind write the inputs
and have the system manage the combining. I'd have to see the body of
@ProcessElement to say more.

Kenn

On Mon, Apr 16, 2018 at 3:33 PM Kenneth Knowles <k...@google.com> wrote:

> There's actually been a rename since that notion of CombiningState. It
> used to be BagState (just blind writes) and CombiningValueState (uses a
> CombineFn) were both instances of CombiningState (any automatically
> mergeable thing).
>
> Now the names are BagState (blind writes) and CombiningState (uses a
> CombineFn) which are instances of GroupingState (automatically mergeable -
> you might wonder why we didn't call it MergeableState...)
>
> Kenn
>
> On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:
>
>> Out of curiosity, what are you using CombiningState for? I believe it is
>> intended for use in merging windows (such as sessions), however those
>> windows are not yet supported with state.
>>
>> Reuven
>>
>> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <achau...@brightcove.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>>> stateful DoFn with the following statespec is throwing
>>> java.lang.UnsupportedOperationException.
>>>
>>> For reference the job information is:
>>>
>>>    - job-id: 2018-04-11_12_11_36-1181436984489583563
>>>
>>> The same code seems to work correctly i.e. without problems in 2.3.0
>>>
>>> @StateId("indexKeys")
>>>         // this is the state spec needed by beam to figure out the state 
>>> spec / type requirements at runtime
>>>         private final StateSpec<CombiningState<KV<String, KV<Long, 
>>> ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> 
>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>>
>>> The exception is:
>>>
>>> java.lang.UnsupportedOperationException
>>>         java.util.AbstractMap.put(AbstractMap.java:209)
>>>         
>>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>>>         
>>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>>>         
>>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>>>         
>>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>>>         
>>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>>
>>> The combine fn is:
>>>
>>>
>>> import com.google.common.collect.Maps;
>>> import com.google.protobuf.ByteString;
>>> import org.apache.beam.sdk.transforms.Combine;
>>> import org.apache.beam.sdk.values.KV;
>>>
>>> import java.util.Map;
>>>
>>> // this combiner ensures that we keep track of the most value of each key 
>>> in the map
>>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, 
>>> KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, 
>>> ByteString>> {
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> createAccumulator() {
>>>         return Maps.newHashMap();
>>>     }
>>>
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, 
>>> ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>>>         String id = input.getKey();
>>>         KV<Long, ByteString> indexKey = input.getValue();
>>>         if (!accumulator.containsKey(id)) {
>>>             accumulator.put(id, indexKey);
>>>         } else {
>>>             KV<Long, ByteString> prevVal = accumulator.get(id);
>>>             if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>>>                 // input is newer than what we have in the map, store it
>>>                 accumulator.put(id, indexKey);
>>>             }
>>>         }
>>>         return accumulator;
>>>     }
>>>
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> 
>>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) 
>>> {
>>>         Map<String, KV<Long, ByteString>> merged = null;
>>>         for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>>>             if (merged == null) {
>>>                 merged = accumulator;
>>>             } else {
>>>                 for (Map.Entry<String, KV<Long, ByteString>> entry : 
>>> accumulator.entrySet()) {
>>>                     String indexId = entry.getKey();
>>>                     KV<Long, ByteString> v = entry.getValue();
>>>                     if (!merged.containsKey(indexId)) {
>>>                         merged.put(indexId, v);
>>>                     } else {
>>>                         KV<Long, ByteString> old = merged.get(indexId);
>>>                         if (old.getKey() < v.getKey()) {
>>>                             merged.put(indexId, v);
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         }
>>>         return merged;
>>>     }
>>>
>>>     @Override
>>>     public Map<String, ByteString> extractOutput(Map<String, KV<Long, 
>>> ByteString>> accumulator) {
>>>         Map<String, ByteString> output = 
>>> Maps.newHashMapWithExpectedSize(accumulator.size());
>>>         for (Map.Entry<String, KV<Long, ByteString>> entry : 
>>> accumulator.entrySet()) {
>>>             output.put(entry.getKey(), entry.getValue().getValue());
>>>         }
>>>         return output;
>>>     }
>>> }
>>>
>>> The exception seems to point that WindmillStateInternals may be
>>> returning an ImmutableMap but I can’t say for sure. Based on the
>>> javadoc for addInput, the accumulator should be mutable.
>>>
>>> Has anyone else seen this issue?
>>>
>>> — Ankur Chauhan
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "dataflow-feedback" group.
>>> To view this discussion on the web visit
>>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>

Reply via email to