One important clarification: After GroupByKey, the type of the elements flowing through SortValues and ParDo(BusinessEnrichment) will be KV<UserId, Iterable<Datum>>. So these are not bundles, but single elements, and an element is atomic. The SortValues transform will operate on each element and make sure the Iterable is sorted. An Iterable is fundamentally sequential and to Beam it is indivisible.
If you have default triggering, you will see one output per key & window. There are no ordering restrictions between these, as event time is independent from processing time. If you have configured triggers for your GroupByKey then you will see multiple output elements per key & window. For a given key & window the outputs contain a "paneIndex" in their PaneInfo metadata that tells you the order of they were output. Again, for different key & window there are no ordering restrictions. Kenn On Thu, Apr 18, 2019 at 11:05 PM Juan Carlos Garcia <[email protected]> wrote: > Hi Folks, > > I have question regarding *GroupBy* and *SortValue* (via SecondaryKey), > > The pipeline looks like: > > ...source+initialTransformations > .apply(Window.Into(FixedWindows.of(Duration.standardMinutes(10)))) > .apply("GroupByKey", GroupByKey.create()) *// using my primaryKey > (userId)* > .apply("SortValues", SortValues.create(BufferedExternalSorter.options())) *// > My Secondary Key is the timestamp of the incoming event* > .apply("Enrichment", ParDo.of(new *BusinessEnrichment*())) *// i receives > a KV<userId, Iterable<KV<timestamp, String>>>* > ..SinkToBigQuery: > > 1. Is it guarantee that my *BusinessEnrichment *will hold all the data > grouped, sorted in on single *machine* and that bundles of the same keys > will not be parallelize during auto scaling (i am running on top of > Dataflow) ? > > I expect a parallel computation on different keys (users), but the bundles > within the grouped key (userId) to be treated sequentially inside > *BusinessEnrichment*, is that correct? > > I am asking due to an observation of mixed results which could be due to a > bug on my code or parallel bundles computations within the same key, my > expectation is to have a sequential processing after *grouping + sorting*, > for example having a userId with 1000 sorted events i expect the processing > as the following sequence: > > userId(1) with bundleA (0..250) -> userId(1) with bundleB (251..500) > -> userId(1) with bundleC (501..750) -> userId(1) with bundleD (751..1000) > > Please advise if my assumption is wrong. > > Thanks and regards > > -- > > JC > >
