The elements of a PCollection are unordered. This includes the results
of a GBK--there is no promise that the output be processed in any (in
particular, windows ordered by timestamp) order. DoFns, especially one
with side effects, should be written with this in mind.

(There is actually ongoing discussion on the dev list about how to
make it easier to write order-sensitive operations.)

On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia <[email protected]> wrote:
>
> Hi Folks,
>
> My team and i have a situation that cannot be explain and
> would like to hear your thoughts, we have a pipeline which
> enrich the incoming messages and write them to BigQuery, the pipeline looks 
> like this:
>
> Apache Beam 2.12.0 / GCP Dataflow
>
> -----
> - ReadFromKafka (with withCreateTime and 10min MaxDelay)
> - ApplySomeInitialEnrichment (just add some stuff to the incoming json 
> messages)
> - Apply a Fixed 1 hour window (with default triggering)
> - Apply Group By Key (userId)
> - Apply External Sorter (SortValues.create(BufferedExternalSorter.options())))
> - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
>   - Read initial state from Hbase (BigTable)
>   - loop thru all messages, enriching them with the previous state 
> (incremental enrichment) and session calculation
>   - write the final state to Hbase (BigTable)
>   - output each of the enriched element to the next DoFn
> - Apply a Transformation to prepare the data to BigQuery
> - Apply BigQueryIO
> ------
>
>
> Just to give some more context we have a meta_info column in our BigQuery 
> table which values are set at the very beginning of the 
> ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> 
> will hold the same information. The meta_info column contains the serialized 
> PaneInfo, WindowInfo and our SystemTimestamp = currentTimeMilliseconds.
>
> We have 3 windows:
>   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), 
> systemTimestamp: 1559396670577
>   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), 
> systemTimestamp: 1559396670670
>   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), 
> systemTimestamp: 1559396670533
>
>
> window A contains: 18 records
> window B contains: 46 records
> window C contains: 3  records
>
> If you pay attention to the A, B, C windowInfo from above, the 
> `systemTimestamp` field reflect an incorrect order of processing, and the 
> enrichment was executed as C -> A ->  B, corrupting all the messages for this 
> given user.
>
> For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
> A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
> onTimeIndex=0}"
>
> Any idea why would the windows be triggered out of order?
>
> --
>
> JC
>

Reply via email to