Yep, that's correct.

On Mon, Jun 3, 2019 at 2:06 PM Juan Carlos Garcia <[email protected]> wrote:
>
> Hi Robert,
>
> The elements of a PCollection are unordered. >> Yes this is something known 
> and understood given the nature of a PCollection.
>
> So that means, that when we are doing a replay of past data (we rewind our 
> kafka consumer groups), in 1h of processing time, there might be multiple 1h 
> windows for a given GBK hence theses windows are fired on any arbitrary order?
>
> Thanks for the insight!
> JC
>
> On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw <[email protected]> wrote:
>>
>> The elements of a PCollection are unordered. This includes the results
>> of a GBK--there is no promise that the output be processed in any (in
>> particular, windows ordered by timestamp) order. DoFns, especially one
>> with side effects, should be written with this in mind.
>>
>> (There is actually ongoing discussion on the dev list about how to
>> make it easier to write order-sensitive operations.)
>>
>> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia <[email protected]> 
>> wrote:
>> >
>> > Hi Folks,
>> >
>> > My team and i have a situation that cannot be explain and
>> > would like to hear your thoughts, we have a pipeline which
>> > enrich the incoming messages and write them to BigQuery, the pipeline 
>> > looks like this:
>> >
>> > Apache Beam 2.12.0 / GCP Dataflow
>> >
>> > -----
>> > - ReadFromKafka (with withCreateTime and 10min MaxDelay)
>> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json 
>> > messages)
>> > - Apply a Fixed 1 hour window (with default triggering)
>> > - Apply Group By Key (userId)
>> > - Apply External Sorter 
>> > (SortValues.create(BufferedExternalSorter.options())))
>> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
>> >   - Read initial state from Hbase (BigTable)
>> >   - loop thru all messages, enriching them with the previous state 
>> > (incremental enrichment) and session calculation
>> >   - write the final state to Hbase (BigTable)
>> >   - output each of the enriched element to the next DoFn
>> > - Apply a Transformation to prepare the data to BigQuery
>> > - Apply BigQueryIO
>> > ------
>> >
>> >
>> > Just to give some more context we have a meta_info column in our BigQuery 
>> > table which values are set at the very beginning of the 
>> > ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> 
>> > will hold the same information. The meta_info column contains the 
>> > serialized PaneInfo, WindowInfo and our SystemTimestamp = 
>> > currentTimeMilliseconds.
>> >
>> > We have 3 windows:
>> >   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), 
>> > systemTimestamp: 1559396670577
>> >   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), 
>> > systemTimestamp: 1559396670670
>> >   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), 
>> > systemTimestamp: 1559396670533
>> >
>> >
>> > window A contains: 18 records
>> > window B contains: 46 records
>> > window C contains: 3  records
>> >
>> > If you pay attention to the A, B, C windowInfo from above, the 
>> > `systemTimestamp` field reflect an incorrect order of processing, and the 
>> > enrichment was executed as C -> A ->  B, corrupting all the messages for 
>> > this given user.
>> >
>> > For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
>> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
>> > onTimeIndex=0}"
>> >
>> > Any idea why would the windows be triggered out of order?
>> >
>> > --
>> >
>> > JC
>> >
>
>
>
> --
>
> JC
>

Reply via email to