Hi Kenn, thanks for replying, it really clear things up.

I checked my code and indeed i found some issues.

JC

On Fri, Apr 19, 2019 at 8:54 PM Kenneth Knowles <[email protected]> wrote:

> One important clarification: After GroupByKey, the type of the elements
> flowing through SortValues and ParDo(BusinessEnrichment) will be KV<UserId,
> Iterable<Datum>>. So these are not bundles, but single elements, and an
> element is atomic. The SortValues transform will operate on each element
> and make sure the Iterable is sorted. An Iterable is fundamentally
> sequential and to Beam it is indivisible.
>
> If you have default triggering, you will see one output per key & window.
> There are no ordering restrictions between these, as event time is
> independent from processing time.
>
> If you have configured triggers for your GroupByKey then you will see
> multiple output elements per key & window. For a given key & window the
> outputs contain a "paneIndex" in their PaneInfo metadata that tells you the
> order of they were output. Again, for different key & window there are no
> ordering restrictions.
>
> Kenn
>
>
> On Thu, Apr 18, 2019 at 11:05 PM Juan Carlos Garcia <[email protected]>
> wrote:
>
>> Hi Folks,
>>
>> I have question regarding *GroupBy* and *SortValue* (via SecondaryKey),
>>
>> The pipeline looks like:
>>
>> ...source+initialTransformations
>> .apply(Window.Into(FixedWindows.of(Duration.standardMinutes(10))))
>> .apply("GroupByKey", GroupByKey.create())  *// using my primaryKey
>> (userId)*
>> .apply("SortValues", SortValues.create(BufferedExternalSorter.options())) *//
>> My Secondary Key is the timestamp of the incoming event*
>> .apply("Enrichment", ParDo.of(new *BusinessEnrichment*())) *// i
>> receives a KV<userId, Iterable<KV<timestamp, String>>>*
>> ..SinkToBigQuery:
>>
>> 1. Is it guarantee that my *BusinessEnrichment *will hold all the data
>> grouped, sorted in on single *machine* and that bundles of the same keys
>> will not be parallelize during auto scaling (i am running on top of
>> Dataflow) ?
>>
>> I expect a parallel computation on different keys (users), but the
>> bundles within the grouped key (userId) to be treated sequentially inside
>> *BusinessEnrichment*, is that correct?
>>
>> I am asking due to an observation of mixed results which could be due to
>> a bug on my code or parallel bundles computations within the same key, my
>> expectation is to have a sequential processing after *grouping + sorting*,
>> for example having a userId with 1000 sorted events i expect the processing
>> as the following sequence:
>>
>>      userId(1) with bundleA (0..250) -> userId(1) with bundleB (251..500)
>> -> userId(1) with bundleC (501..750) -> userId(1) with bundleD (751..1000)
>>
>> Please advise if my assumption is wrong.
>>
>> Thanks and regards
>>
>> --
>>
>> JC
>>
>>

-- 

JC

Reply via email to