Hi Ryan, perhaps this is https://issues.apache.org/jira/browse/BEAM-197 ?
On Mon, May 23, 2016 at 6:47 PM, Ryan Madsen <ryan.mad...@gmail.com> wrote: > Hi all, > > I'm looking to solve a problem related to performing a join on two > streaming datasets, and am having a hard time figuring out if Beam provides > a model that can help here. I'm curious if I can create a system that can > take the outputs of two streaming data sources, and run a join on them, > outputting the "joined" values to a new collection. This first part looks > quite easy to do with CoGroupByKey, but there's a twist: if updates are > received out-of-order, I'd like to emit proper updates so the output of the > join will be consistent no matter where I look at in time. That is, the > database doesn't just store the latest view of a collection, but also > stores what that collection looked like for all past values of time. > > This is probably unclear, so let me draw out a diagram. Imagine we have 2 > input collections: aToB and bToC. If aToB has key aX -> bY @ event time 1, > and bToC has bY -> cZ @ event time 2, my output collection should contain > aX -> cZ @ event time 2. It shows up at eventTime 2 because that's when all > the inputs aligned in time to create an actual join. If we later (in > processing time) receive an update that bY -> cW @ event time 0, then we > should update our output collection to contain both [aX -> cW @ event time > 1, and aX -> cZ @ event time 2]. I've included a diagram that runs through > a few update notifications and what the expected outputs should be. > > Is this a problem that falls under Beam's problem-domain? Are there any > examples or previous instances of people performing these operations? > > Thanks in advance, > Ryan > > > The expected outputs at each processing time can be seen underneath the > pink "Outputs" bar. Traverse the diagram column-by-column, starting at the > left (processing time 0). > > Processing time (pt) pt0 pt1 pt2 pt3 pt4 pt5 > Notification (@eventTime) aToB/a1 -> b1 @et1 bToC/b1-> c1 @et2 bToC/b1-> > c0 @et0 bToC/b2 -> c3 @et0 bToC/b3 -> c4 @et2 aToB/a1 -> b3 @et1 > Event Time @et0: @et0: @et0: @et0: @et0: @et0: > aToB Input Map values aToB aToB aToB aToB aToB aToB > bToC Input Map values bToC bToC bToC bToC bToC bToC > b1 -> c0 b1->c0 b1->c0 b1->c0 > b2->c3 b2->c3 b2->c3 > Outputs aToC aToC aToC aToC aToC aToC > @et1: @et1: @et1: @et1: @et1: @et1: > aToB Input Map values aToB aToB aToB aToB aToB aToB > a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3 > bToC Input Map values bToC bToC bToC bToC bToC bToC > b1->c0 b1->c0 b1->c0 b1->c0 > b2->c3 b2->c3 b2->c3 > Outputs aToC aToC aToC aToC aToC aToC > a1->c0 a1->c0 a1->c0 > @et2: @et2: @et2: @et2: @et2: @et2: > aToB Input Map values aToB aToB aToB aToB aToB aToB > a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3 > bToC Input Map values bToC bToC bToC bToC bToC bToC > b1->c1 b1->c1 b1->c1 b1->c1 b1->c1 > b2->c3 b2->c3 b2->c3 > b3->c4 b3->c4 > Outputs aToC aToC aToC aToC aToC aToC > a1->c1 a1->c1 a1->c1 a1->c1 a1->c4 > Key: > New input state New input state@later time New derived state Retraction > pt=Processing Time et=EventTime > > > > >