Hi all,

I have a question about how much state is buffered when using
Combine.perKey with a custom accumulator. For example, I have:

PCollection<KV<String, String>> elements = ...;

PCollection<KV<String, List<String>> topValuesPerKey = elements

.apply(Window.into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(10))))

.accumulatingFiredPanes())

.apply(Combine.perKey(new MyCombineFunction()));


Here MyCombineFunction is for each key, counting the occurrences of each
value. It's output for each key is a List<String> of the values that occur
most frequently. In this case the accumulator for each key just stores a
Map<String, Long> of values and their associated counts.


My question is - since I am accumulatingFiredPanes forever on the global
window - is every element going to be buffered forever (i.e. amount of
space needed will constantly increase)? Or, is the amount of state buffered
determined by my accumulator (i.e. determined by the number of unique
values across all keys)? If the former is the case, how can I optimise my
job so that the accumulator is the only state stored across panes?


Thanks for any advice,

Josh

Reply via email to