Hi Robert, *The elements of a PCollection are unordered.* >> Yes this is something known and understood given the nature of a PCollection.
So that means, that when we are doing a replay of past data (we rewind our kafka consumer groups), in 1h of processing time, there might be multiple 1h windows for a given GBK hence theses windows are fired on any arbitrary order? Thanks for the insight! JC On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw <[email protected]> wrote: > The elements of a PCollection are unordered. This includes the results > of a GBK--there is no promise that the output be processed in any (in > particular, windows ordered by timestamp) order. DoFns, especially one > with side effects, should be written with this in mind. > > (There is actually ongoing discussion on the dev list about how to > make it easier to write order-sensitive operations.) > > On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia <[email protected]> > wrote: > > > > Hi Folks, > > > > My team and i have a situation that cannot be explain and > > would like to hear your thoughts, we have a pipeline which > > enrich the incoming messages and write them to BigQuery, the pipeline > looks like this: > > > > Apache Beam 2.12.0 / GCP Dataflow > > > > ----- > > - ReadFromKafka (with withCreateTime and 10min MaxDelay) > > - ApplySomeInitialEnrichment (just add some stuff to the incoming json > messages) > > - Apply a Fixed 1 hour window (with default triggering) > > - Apply Group By Key (userId) > > - Apply External Sorter > (SortValues.create(BufferedExternalSorter.options()))) > > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>) > > - Read initial state from Hbase (BigTable) > > - loop thru all messages, enriching them with the previous state > (incremental enrichment) and session calculation > > - write the final state to Hbase (BigTable) > > - output each of the enriched element to the next DoFn > > - Apply a Transformation to prepare the data to BigQuery > > - Apply BigQueryIO > > ------ > > > > > > Just to give some more context we have a meta_info column in our > BigQuery table which values are set at the very beginning of the > ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> > will hold the same information. The meta_info column contains the > serialized PaneInfo, WindowInfo and our SystemTimestamp = > currentTimeMilliseconds. > > > > We have 3 windows: > > A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), > systemTimestamp: 1559396670577 > > B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), > systemTimestamp: 1559396670670 > > C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), > systemTimestamp: 1559396670533 > > > > > > window A contains: 18 records > > window B contains: 46 records > > window C contains: 3 records > > > > If you pay attention to the A, B, C windowInfo from above, the > `systemTimestamp` field reflect an incorrect order of processing, and the > enrichment was executed as C -> A -> B, corrupting all the messages for > this given user. > > > > For all 3 windows the serialized PaneInfo was set by the runner to > ON_TIME: > > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, > onTimeIndex=0}" > > > > Any idea why would the windows be triggered out of order? > > > > -- > > > > JC > > > -- JC
