Hi Ryan, perhaps this is https://issues.apache.org/jira/browse/BEAM-197 ?



On Mon, May 23, 2016 at 6:47 PM, Ryan Madsen <ryan.mad...@gmail.com> wrote:

> Hi all,
>
> I'm looking to solve a problem related to performing a join on two
> streaming datasets, and am having a hard time figuring out if Beam provides
> a model that can help here. I'm curious if I can create a system that can
> take the outputs of two streaming data sources, and run a join on them,
> outputting the "joined" values to a new collection. This first part looks
> quite easy to do with CoGroupByKey, but there's a twist: if updates are
> received out-of-order, I'd like to emit proper updates so the output of the
> join will be consistent no matter where I look at in time. That is, the
> database doesn't just store the latest view of a collection, but also
> stores what that collection looked like for all past values of time.
>
> This is probably unclear, so let me draw out a diagram. Imagine we have 2
> input collections: aToB and bToC. If aToB has key aX -> bY @ event time 1,
> and bToC has bY -> cZ @ event time 2, my output collection should contain
> aX -> cZ @ event time 2. It shows up at eventTime 2 because that's when all
> the inputs aligned in time to create an actual join. If we later (in
> processing time) receive an update that bY -> cW @ event time 0, then we
> should update our output collection to contain both [aX -> cW @ event time
> 1, and aX -> cZ @ event time 2].  I've included a diagram that runs through
> a few update notifications and what the expected outputs should be.
>
> Is this a problem that falls under Beam's problem-domain? Are there any
> examples or previous instances of people performing these operations?
>
> Thanks in advance,
> Ryan
>
>
> The expected outputs at each processing time can be seen underneath the
> pink "Outputs" bar. Traverse the diagram column-by-column, starting at the
> left (processing time 0).
>
> Processing time (pt) pt0 pt1 pt2 pt3 pt4 pt5
> Notification (@eventTime) aToB/a1 -> b1 @et1 bToC/b1-> c1 @et2 bToC/b1->
> c0 @et0 bToC/b2 -> c3 @et0 bToC/b3 -> c4 @et2 aToB/a1 -> b3 @et1
> Event Time @et0: @et0: @et0: @et0: @et0: @et0:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1 -> c0 b1->c0 b1->c0 b1->c0
> b2->c3 b2->c3 b2->c3
> Outputs aToC aToC aToC aToC aToC aToC
> @et1: @et1: @et1: @et1: @et1: @et1:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1->c0 b1->c0 b1->c0 b1->c0
> b2->c3 b2->c3 b2->c3
> Outputs aToC aToC aToC aToC aToC aToC
> a1->c0 a1->c0 a1->c0
> @et2: @et2: @et2: @et2: @et2: @et2:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1->c1 b1->c1 b1->c1 b1->c1 b1->c1
> b2->c3 b2->c3 b2->c3
> b3->c4 b3->c4
> Outputs aToC aToC aToC aToC aToC aToC
> a1->c1 a1->c1 a1->c1 a1->c1 a1->c4
> Key:
> New input state New input state@later time New derived state Retraction
> pt=Processing Time et=EventTime
>
>
>
>
>

Reply via email to