There's actually been a rename since that notion of CombiningState. It used
to be BagState (just blind writes) and CombiningValueState (uses a
CombineFn) were both instances of CombiningState (any automatically
mergeable thing).

Now the names are BagState (blind writes) and CombiningState (uses a
CombineFn) which are instances of GroupingState (automatically mergeable -
you might wonder why we didn't call it MergeableState...)

Kenn

On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:

> Out of curiosity, what are you using CombiningState for? I believe it is
> intended for use in merging windows (such as sessions), however those
> windows are not yet supported with state.
>
> Reuven
>
> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <achau...@brightcove.com>
> wrote:
>
>> Hi all,
>>
>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>> stateful DoFn with the following statespec is throwing
>> java.lang.UnsupportedOperationException.
>>
>> For reference the job information is:
>>
>>    - job-id: 2018-04-11_12_11_36-1181436984489583563
>>
>> The same code seems to work correctly i.e. without problems in 2.3.0
>>
>> @StateId("indexKeys")
>>         // this is the state spec needed by beam to figure out the state 
>> spec / type requirements at runtime
>>         private final StateSpec<CombiningState<KV<String, KV<Long, 
>> ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> 
>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>
>> The exception is:
>>
>> java.lang.UnsupportedOperationException
>>         java.util.AbstractMap.put(AbstractMap.java:209)
>>         
>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>>         
>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>>         
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>>         
>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>>         
>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>
>> The combine fn is:
>>
>>
>> import com.google.common.collect.Maps;
>> import com.google.protobuf.ByteString;
>> import org.apache.beam.sdk.transforms.Combine;
>> import org.apache.beam.sdk.values.KV;
>>
>> import java.util.Map;
>>
>> // this combiner ensures that we keep track of the most value of each key in 
>> the map
>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, 
>> KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, 
>> ByteString>> {
>>     @Override
>>     public Map<String, KV<Long, ByteString>> createAccumulator() {
>>         return Maps.newHashMap();
>>     }
>>
>>     @Override
>>     public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, 
>> ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>>         String id = input.getKey();
>>         KV<Long, ByteString> indexKey = input.getValue();
>>         if (!accumulator.containsKey(id)) {
>>             accumulator.put(id, indexKey);
>>         } else {
>>             KV<Long, ByteString> prevVal = accumulator.get(id);
>>             if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>>                 // input is newer than what we have in the map, store it
>>                 accumulator.put(id, indexKey);
>>             }
>>         }
>>         return accumulator;
>>     }
>>
>>     @Override
>>     public Map<String, KV<Long, ByteString>> 
>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
>>         Map<String, KV<Long, ByteString>> merged = null;
>>         for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>>             if (merged == null) {
>>                 merged = accumulator;
>>             } else {
>>                 for (Map.Entry<String, KV<Long, ByteString>> entry : 
>> accumulator.entrySet()) {
>>                     String indexId = entry.getKey();
>>                     KV<Long, ByteString> v = entry.getValue();
>>                     if (!merged.containsKey(indexId)) {
>>                         merged.put(indexId, v);
>>                     } else {
>>                         KV<Long, ByteString> old = merged.get(indexId);
>>                         if (old.getKey() < v.getKey()) {
>>                             merged.put(indexId, v);
>>                         }
>>                     }
>>                 }
>>             }
>>         }
>>         return merged;
>>     }
>>
>>     @Override
>>     public Map<String, ByteString> extractOutput(Map<String, KV<Long, 
>> ByteString>> accumulator) {
>>         Map<String, ByteString> output = 
>> Maps.newHashMapWithExpectedSize(accumulator.size());
>>         for (Map.Entry<String, KV<Long, ByteString>> entry : 
>> accumulator.entrySet()) {
>>             output.put(entry.getKey(), entry.getValue().getValue());
>>         }
>>         return output;
>>     }
>> }
>>
>> The exception seems to point that WindmillStateInternals may be
>> returning an ImmutableMap but I can’t say for sure. Based on the javadoc
>> for addInput, the accumulator should be mutable.
>>
>> Has anyone else seen this issue?
>>
>> — Ankur Chauhan
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
> --
> You received this message because you are subscribed to the Google Groups
> "dataflow-feedback" group.
> To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>

Reply via email to