Hi Luke, thanks for your reply.

I had a confusion regarding the semantics of GroupByKey, Windowing and
ParDo, I initially thought ParDo requires a GroupByKey in the pipeline
beforehand, and I believe that if you use GroupByKey then all the
events are buffered before they are passed to the ParDo.

Another issue that we are facing:

The events' timestamps are generated by many of our clients, which do
not have clocks synchronised (personal devices). Is there a way to use
event time on a per key basis, and use it to indicate the session
expiry somehow, for us the session expires when there are no
additional events for a specified period.

Thanks!
Des.

On Wed, Nov 10, 2021 at 7:39 PM Luke Cwik <[email protected]> wrote:
>
> To answer the follow-up, you can use a stateful DoFn which relies on the 
> runner to ensure that records are routed to the appropriate stateful DoFn 
> instance or you can control when the output is produced using the trigger on 
> the GroupByKey.
>
> Take a look at:
> https://beam.apache.org/blog/timely-processing/
> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam/45911664
>
> and some of the examples:
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
>  for an example of using session windows.
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
>  for an example for stateful processing.
>
> There are also more learning resources:
> https://beam.apache.org/documentation/resources/learning-resources/
>
>
> On Wed, Nov 10, 2021 at 9:30 AM Luke Cwik <[email protected]> wrote:
>>
>>
>>
>> On Wed, Nov 10, 2021 at 6:22 AM Desmond F <[email protected]> wrote:
>>>
>>> Hi all,
>>>
>>> We have many clients connected via websockets through api gateway on
>>> AWS, these clients submit events of various types periodically, each
>>> event contains a sessionID (generated by the client), the session
>>> logically ends when there's no activity for a specified duration of
>>> time. We have a sequence model (RNN) written in PyTorch that needs to
>>> send predictions back to the clients for each event processed. The
>>> input to the model contains the raw events sent in the order they were
>>> generated.
>>>
>>> I believe the pipeline looks something like this: Source -> Group By
>>> Session ID -> Accumulate Events and Make Predictions
>>>
>>> 1. In your opinion, does this use-case fit naturally with Apache Beam
>>> programming model?
>>
>> Yes.
>>
>>> 2. We'd like the session to expire after a predefined duration
>>> (processing time) of no activity, how can this be achieved?
>>
>> You can do this with a stateful DoFn storing all the accumulated data and 
>> use a processing time trigger to expire the results. Have you considered 
>> using event time instead since session windows with a gap size seem to be a 
>> good fit and will handle cases where the source producing events gets 
>> delayed or internal processing within the pipeline gets delayed.
>>
>>> 3. Before session expiry we'd like to produce the session details to
>>> another task that batchify sessions and writes them to S3
>>
>> Yes, the processing time trigger can emit the results when it fires. If 
>> using event time then the GroupByKey trigger can be used to control when 
>> results are produced. For example if you use the default trigger which 
>> produces results when the window is done then the downstream transforms 
>> would get to see all the results for each session as the session expires.
>>
>>> 4. The "Accumulate Events and Make Predictions" should be a stateful
>>> function that builds the session and for each event it should call the
>>> model with the historical events, what is the best way to achieve
>>> that?
>>
>> With the stateful DoFn you can do this as you see the events come in. 
>> Relying on session windows with event times may not fit this approach as 
>> well.
>>
>>> 5. How to ensure that events for each key arrive in the same order
>>> they were produced? Is it already guaranteed?
>>
>> Normally this isn't guaranteed but some runners do provide this guarantee 
>> automatically. I believe there is a page on the blog that lists different 
>> runners and their time sorting characteristics. You could also take a look 
>> at runners that support @RequiresTimeSortedInput.
>>
>>>
>>> 6. The model needs to react to events in real-time (i.e, in less than
>>> 500ms), do you believe this is achievable?
>>
>> This is dependent on the source that is publishing the data and the runner 
>> as these will impact the latency. You can achieve low latencies using the 
>> direct runner (a local single process runner) assuming that the source is 
>> configured to checkpoint often so that the ingested data can be processed 
>> downstream.  Other distributed runners are typically on the order of low 
>> seconds.
>>
>>>
>>>
>>> Thanks in advance,
>>> Desmond

Reply via email to