Hi Kenn, thanks for replying, it really clear things up. I checked my code and indeed i found some issues.
JC On Fri, Apr 19, 2019 at 8:54 PM Kenneth Knowles <[email protected]> wrote: > One important clarification: After GroupByKey, the type of the elements > flowing through SortValues and ParDo(BusinessEnrichment) will be KV<UserId, > Iterable<Datum>>. So these are not bundles, but single elements, and an > element is atomic. The SortValues transform will operate on each element > and make sure the Iterable is sorted. An Iterable is fundamentally > sequential and to Beam it is indivisible. > > If you have default triggering, you will see one output per key & window. > There are no ordering restrictions between these, as event time is > independent from processing time. > > If you have configured triggers for your GroupByKey then you will see > multiple output elements per key & window. For a given key & window the > outputs contain a "paneIndex" in their PaneInfo metadata that tells you the > order of they were output. Again, for different key & window there are no > ordering restrictions. > > Kenn > > > On Thu, Apr 18, 2019 at 11:05 PM Juan Carlos Garcia <[email protected]> > wrote: > >> Hi Folks, >> >> I have question regarding *GroupBy* and *SortValue* (via SecondaryKey), >> >> The pipeline looks like: >> >> ...source+initialTransformations >> .apply(Window.Into(FixedWindows.of(Duration.standardMinutes(10)))) >> .apply("GroupByKey", GroupByKey.create()) *// using my primaryKey >> (userId)* >> .apply("SortValues", SortValues.create(BufferedExternalSorter.options())) *// >> My Secondary Key is the timestamp of the incoming event* >> .apply("Enrichment", ParDo.of(new *BusinessEnrichment*())) *// i >> receives a KV<userId, Iterable<KV<timestamp, String>>>* >> ..SinkToBigQuery: >> >> 1. Is it guarantee that my *BusinessEnrichment *will hold all the data >> grouped, sorted in on single *machine* and that bundles of the same keys >> will not be parallelize during auto scaling (i am running on top of >> Dataflow) ? >> >> I expect a parallel computation on different keys (users), but the >> bundles within the grouped key (userId) to be treated sequentially inside >> *BusinessEnrichment*, is that correct? >> >> I am asking due to an observation of mixed results which could be due to >> a bug on my code or parallel bundles computations within the same key, my >> expectation is to have a sequential processing after *grouping + sorting*, >> for example having a userId with 1000 sorted events i expect the processing >> as the following sequence: >> >> userId(1) with bundleA (0..250) -> userId(1) with bundleB (251..500) >> -> userId(1) with bundleC (501..750) -> userId(1) with bundleD (751..1000) >> >> Please advise if my assumption is wrong. >> >> Thanks and regards >> >> -- >> >> JC >> >> -- JC
