Hi Luke, thanks for your reply. I had a confusion regarding the semantics of GroupByKey, Windowing and ParDo, I initially thought ParDo requires a GroupByKey in the pipeline beforehand, and I believe that if you use GroupByKey then all the events are buffered before they are passed to the ParDo.
Another issue that we are facing: The events' timestamps are generated by many of our clients, which do not have clocks synchronised (personal devices). Is there a way to use event time on a per key basis, and use it to indicate the session expiry somehow, for us the session expires when there are no additional events for a specified period. Thanks! Des. On Wed, Nov 10, 2021 at 7:39 PM Luke Cwik <[email protected]> wrote: > > To answer the follow-up, you can use a stateful DoFn which relies on the > runner to ensure that records are routed to the appropriate stateful DoFn > instance or you can control when the output is produced using the trigger on > the GroupByKey. > > Take a look at: > https://beam.apache.org/blog/timely-processing/ > https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam/45911664 > > and some of the examples: > https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java > for an example of using session windows. > https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java > for an example for stateful processing. > > There are also more learning resources: > https://beam.apache.org/documentation/resources/learning-resources/ > > > On Wed, Nov 10, 2021 at 9:30 AM Luke Cwik <[email protected]> wrote: >> >> >> >> On Wed, Nov 10, 2021 at 6:22 AM Desmond F <[email protected]> wrote: >>> >>> Hi all, >>> >>> We have many clients connected via websockets through api gateway on >>> AWS, these clients submit events of various types periodically, each >>> event contains a sessionID (generated by the client), the session >>> logically ends when there's no activity for a specified duration of >>> time. We have a sequence model (RNN) written in PyTorch that needs to >>> send predictions back to the clients for each event processed. The >>> input to the model contains the raw events sent in the order they were >>> generated. >>> >>> I believe the pipeline looks something like this: Source -> Group By >>> Session ID -> Accumulate Events and Make Predictions >>> >>> 1. In your opinion, does this use-case fit naturally with Apache Beam >>> programming model? >> >> Yes. >> >>> 2. We'd like the session to expire after a predefined duration >>> (processing time) of no activity, how can this be achieved? >> >> You can do this with a stateful DoFn storing all the accumulated data and >> use a processing time trigger to expire the results. Have you considered >> using event time instead since session windows with a gap size seem to be a >> good fit and will handle cases where the source producing events gets >> delayed or internal processing within the pipeline gets delayed. >> >>> 3. Before session expiry we'd like to produce the session details to >>> another task that batchify sessions and writes them to S3 >> >> Yes, the processing time trigger can emit the results when it fires. If >> using event time then the GroupByKey trigger can be used to control when >> results are produced. For example if you use the default trigger which >> produces results when the window is done then the downstream transforms >> would get to see all the results for each session as the session expires. >> >>> 4. The "Accumulate Events and Make Predictions" should be a stateful >>> function that builds the session and for each event it should call the >>> model with the historical events, what is the best way to achieve >>> that? >> >> With the stateful DoFn you can do this as you see the events come in. >> Relying on session windows with event times may not fit this approach as >> well. >> >>> 5. How to ensure that events for each key arrive in the same order >>> they were produced? Is it already guaranteed? >> >> Normally this isn't guaranteed but some runners do provide this guarantee >> automatically. I believe there is a page on the blog that lists different >> runners and their time sorting characteristics. You could also take a look >> at runners that support @RequiresTimeSortedInput. >> >>> >>> 6. The model needs to react to events in real-time (i.e, in less than >>> 500ms), do you believe this is achievable? >> >> This is dependent on the source that is publishing the data and the runner >> as these will impact the latency. You can achieve low latencies using the >> direct runner (a local single process runner) assuming that the source is >> configured to checkpoint often so that the ingested data can be processed >> downstream. Other distributed runners are typically on the order of low >> seconds. >> >>> >>> >>> Thanks in advance, >>> Desmond
