Hi everyone, I'm trying to write a simple pipeline to experiment both stateful processing and session window.
I have an event stream, each event has a timestamp and a session key, I want to group by each session and enrich all events using a common state of the session. In this case I'm just replacing the event with an incremental counter. So, let's say I have a source that outputs an event every second and my stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only the session key as the value is useless for the purpose of the issue I'm experiencing) I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a, 3>, ...] (actually the order is not important) Unluckily my code seems not to work as I was expecting and I'm not able to understand the reason. (to be honest I haven't found many resources on the topic) What I actually get is something like: a, 0 a, 1 b, 0 a, 0 <-- ??? a, 2, <---??? c, 0, ... that makes me wonder if I have actually understood how the state is related to the key-window pair or maybe if I have just misunderstood how the window/triggering works. My pipeline looks something like: p.apply(TextIO.read().from("input.json")) .apply(MapElements.via(new ParseTableRowJson())) .apply(new AugmentEvents()) .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() { @ProcessElement public void processElement(ProcessContext c) { LOG.info(c.element().getKey() + ": " + c.element().getValue()); } })); ... static class AugmentEvents extends PTransform<PCollection<TableRow>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<TableRow> input) { return input .apply(ParDo.of(new ExtractSessionIdAndTimestamp())) .apply(new ComputeSessions()); } } static class ComputeSessions extends PTransform<PCollection<KV<String, TableRow>>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<KV<String, TableRow>> events) { return events .apply(Window.<KV<String, TableRow>>into(Sessions. withGapDuration(Duration.standardMinutes(10))) .triggering(AfterPane.elementCountAtLeast(1)) .discardingFiredPanes() .withAllowedLateness(Duration.standardMinutes(10))) .apply(ParDo.of(new StatefulCount())); } } static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String, Long>> { @StateId("storage") private final StateSpec<ValueState<Integer>> storageSpec = StateSpecs.value(VarIntCoder.of()); @ProcessElement public void processElement(ProcessContext context, BoundedWindow window, @StateId("storage") ValueState<Integer> storage) { Integer val = storage.read(); if (val == null) { val = new Integer(0); } int current = val.intValue(); context.output(KV.of(context.element().getKey(), new Long(current))); storage.write(current+1); } } Maurizio