Re: Stateful processing with session window

2018-02-12 Thread Kenneth Knowles
On Mon, Feb 12, 2018 at 1:09 AM, Maurizio Sambati 
wrote:

> Hi Carlos,
>
> What I think is happening here is that the third 'a' you see is actually
>> on a different window of the other 3 a's. Stateful being per key and window
>> means that it keeps state for each key-window pairs, therefore, if your
>> 'a's counter is being restarted is probably because it is actually a
>> different one, and as the key is the same then the only possibility is that
>> the window is different.
>>
>
> Yeah, that was my initial guess too, that's why I have questioned if I
> have understood the semantic of the session window itself. Fortunately, as
> Kenneth pointed out, my understanding was correct but this window model is
> not compatible with stateful processing yet.
>

I want to mention something here - we do know that it is compatible with
merging windows. In fact, triggers use the state mechanism in merging
windows "under the hood". The issue is connecting it to user-defined state.
Each runner is slightly different, though it is not terribly difficult for
any of them. For BagState, it will automatically combine the bags. For
CombiningState it will automaticaly use mergeAccumulators. For ValueState
it will probably not be supported for a while, and perhaps eventually will
have a merge callback.

Kenn


Re: Stateful processing with session window

2018-02-12 Thread Maurizio Sambati
Hi Carlos,

What I think is happening here is that the third 'a' you see is actually on
> a different window of the other 3 a's. Stateful being per key and window
> means that it keeps state for each key-window pairs, therefore, if your
> 'a's counter is being restarted is probably because it is actually a
> different one, and as the key is the same then the only possibility is that
> the window is different.
>

Yeah, that was my initial guess too, that's why I have questioned if I have
understood the semantic of the session window itself. Fortunately, as
Kenneth pointed out, my understanding was correct but this window model is
not compatible with stateful processing yet.

Maurizio


Re: Stateful processing with session window

2018-02-12 Thread Maurizio Sambati
Hi Kenneth,

What runner are you using? Are you trying this out in the DirectRunner? As
> far as I know, no runner supports stateful processing in session windows
> yet. It is probably a bug that your pipeline was accepted by the runner
> when it includes features that the runner cannot execute. It would be a
> real mistake to have missed this validation for the DirectRunner.
>

Oh, ok, got it. :-(
I was actually trying this on the DirectRunner and it didn't fire errors,
so I guess no validation is performed in that sense.

Support for stateful processing in merging windows is definitely important.
> There's only a Jira filed for Dataflow [1] as far as I can tell. I just
> cloned it for the DirectRunner [2] since that is how you would test your
> pipeline. If you want to follow the same feature for a different runner, I
> can route a new ticket to the right person.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-2507
> [2] https://issues.apache.org/jira/browse/BEAM-3686
>

Awesome, I'm glad there is interest in adding this feature, it really fits
our use case. (and for the moment we have only interests for the two target
runners mentioned by you)

Thanks,
Maurizio


Re: Stateful processing with session window

2018-02-11 Thread Kenneth Knowles
Hi Maurizio,

What runner are you using? Are you trying this out in the DirectRunner? As
far as I know, no runner supports stateful processing in session windows
yet. It is probably a bug that your pipeline was accepted by the runner
when it includes features that the runner cannot execute. It would be a
real mistake to have missed this validation for the DirectRunner.

Support for stateful processing in merging windows is definitely important.
There's only a Jira filed for Dataflow [1] as far as I can tell. I just
cloned it for the DirectRunner [2] since that is how you would test your
pipeline. If you want to follow the same feature for a different runner, I
can route a new ticket to the right person.

Kenn

[1] https://issues.apache.org/jira/browse/BEAM-2507
[2] https://issues.apache.org/jira/browse/BEAM-3686


On Fri, Feb 9, 2018 at 7:46 AM, Maurizio Sambati 
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [, , , , , ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0<-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn, Void>() {
>
>   @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
> LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform,
> PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection input)
> {
>
> return input
>
>   .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>   .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform TableRow>>, PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection TableRow>> events) {
>
> return events
>
>   .apply(Window.>into(Sessions.withGa
> pDuration(Duration.standardMinutes(10)))
>
>   .triggering(AfterPane.elementCountAtLeast(1))
>
>   .discardingFiredPanes()
>
>   .withAllowedLateness(Duration.standardMinutes(10)))
>
>   .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn, KV Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec> storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState storage) {
>
> Integer val = storage.read();
>
> if (val == null) {
>
>   val = new Integer(0);
>
> }
>
> int current = val.intValue();
>
> context.output(KV.of(context.element().getKey(), new Long(current)));
>
> storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>
>


Re: Stateful processing with session window

2018-02-10 Thread Carlos Alonso
Hi Maurizio, I'm not a very experienced user here, I'm actually getting
started into all this, but I'm going to give this one a try and see if I
can help.

What I think is happening here is that the third 'a' you see is actually on
a different window of the other 3 a's. Stateful being per key and window
means that it keeps state for each key-window pairs, therefore, if your
'a's counter is being restarted is probably because it is actually a
different one, and as the key is the same then the only possibility is that
the window is different. You can try to debug your pipeline and see if my
guess is actually right or not by printing also the window information of
your elements.

Hope it helps.

On Fri, Feb 9, 2018 at 4:46 PM Maurizio Sambati 
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [, , , , , ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0<-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn, Void>() {
>
>   @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
> LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform,
> PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection input)
> {
>
> return input
>
>   .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>   .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform TableRow>>, PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection TableRow>> events) {
>
> return events
>
>   .apply(Window. TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
>   .triggering(AfterPane.elementCountAtLeast(1))
>
>   .discardingFiredPanes()
>
>   .withAllowedLateness(Duration.standardMinutes(10)))
>
>   .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn, KV Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec> storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState storage) {
>
> Integer val = storage.read();
>
> if (val == null) {
>
>   val = new Integer(0);
>
> }
>
> int current = val.intValue();
>
> context.output(KV.of(context.element().getKey(), new Long(current)));
>
> storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>