Yep, that's correct.
On Mon, Jun 3, 2019 at 2:06 PM Juan Carlos Garcia <[email protected]> wrote: > > Hi Robert, > > The elements of a PCollection are unordered. >> Yes this is something known > and understood given the nature of a PCollection. > > So that means, that when we are doing a replay of past data (we rewind our > kafka consumer groups), in 1h of processing time, there might be multiple 1h > windows for a given GBK hence theses windows are fired on any arbitrary order? > > Thanks for the insight! > JC > > On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw <[email protected]> wrote: >> >> The elements of a PCollection are unordered. This includes the results >> of a GBK--there is no promise that the output be processed in any (in >> particular, windows ordered by timestamp) order. DoFns, especially one >> with side effects, should be written with this in mind. >> >> (There is actually ongoing discussion on the dev list about how to >> make it easier to write order-sensitive operations.) >> >> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia <[email protected]> >> wrote: >> > >> > Hi Folks, >> > >> > My team and i have a situation that cannot be explain and >> > would like to hear your thoughts, we have a pipeline which >> > enrich the incoming messages and write them to BigQuery, the pipeline >> > looks like this: >> > >> > Apache Beam 2.12.0 / GCP Dataflow >> > >> > ----- >> > - ReadFromKafka (with withCreateTime and 10min MaxDelay) >> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json >> > messages) >> > - Apply a Fixed 1 hour window (with default triggering) >> > - Apply Group By Key (userId) >> > - Apply External Sorter >> > (SortValues.create(BufferedExternalSorter.options()))) >> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>) >> > - Read initial state from Hbase (BigTable) >> > - loop thru all messages, enriching them with the previous state >> > (incremental enrichment) and session calculation >> > - write the final state to Hbase (BigTable) >> > - output each of the enriched element to the next DoFn >> > - Apply a Transformation to prepare the data to BigQuery >> > - Apply BigQueryIO >> > ------ >> > >> > >> > Just to give some more context we have a meta_info column in our BigQuery >> > table which values are set at the very beginning of the >> > ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> >> > will hold the same information. The meta_info column contains the >> > serialized PaneInfo, WindowInfo and our SystemTimestamp = >> > currentTimeMilliseconds. >> > >> > We have 3 windows: >> > A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), >> > systemTimestamp: 1559396670577 >> > B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), >> > systemTimestamp: 1559396670670 >> > C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), >> > systemTimestamp: 1559396670533 >> > >> > >> > window A contains: 18 records >> > window B contains: 46 records >> > window C contains: 3 records >> > >> > If you pay attention to the A, B, C windowInfo from above, the >> > `systemTimestamp` field reflect an incorrect order of processing, and the >> > enrichment was executed as C -> A -> B, corrupting all the messages for >> > this given user. >> > >> > For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME: >> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, >> > onTimeIndex=0}" >> > >> > Any idea why would the windows be triggered out of order? >> > >> > -- >> > >> > JC >> > > > > > -- > > JC >
