Hi Robert,

*The elements of a PCollection are unordered.* >> Yes this is something
known and understood given the nature of a PCollection.

So that means, that when we are doing a replay of past data (we rewind our
kafka consumer groups), in 1h of processing time, there might be multiple
1h windows for a given GBK hence theses windows are fired on any arbitrary
order?

Thanks for the insight!
JC

On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw <[email protected]> wrote:

> The elements of a PCollection are unordered. This includes the results
> of a GBK--there is no promise that the output be processed in any (in
> particular, windows ordered by timestamp) order. DoFns, especially one
> with side effects, should be written with this in mind.
>
> (There is actually ongoing discussion on the dev list about how to
> make it easier to write order-sensitive operations.)
>
> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia <[email protected]>
> wrote:
> >
> > Hi Folks,
> >
> > My team and i have a situation that cannot be explain and
> > would like to hear your thoughts, we have a pipeline which
> > enrich the incoming messages and write them to BigQuery, the pipeline
> looks like this:
> >
> > Apache Beam 2.12.0 / GCP Dataflow
> >
> > -----
> > - ReadFromKafka (with withCreateTime and 10min MaxDelay)
> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json
> messages)
> > - Apply a Fixed 1 hour window (with default triggering)
> > - Apply Group By Key (userId)
> > - Apply External Sorter
> (SortValues.create(BufferedExternalSorter.options())))
> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
> >   - Read initial state from Hbase (BigTable)
> >   - loop thru all messages, enriching them with the previous state
> (incremental enrichment) and session calculation
> >   - write the final state to Hbase (BigTable)
> >   - output each of the enriched element to the next DoFn
> > - Apply a Transformation to prepare the data to BigQuery
> > - Apply BigQueryIO
> > ------
> >
> >
> > Just to give some more context we have a meta_info column in our
> BigQuery table which values are set at the very beginning of the
> ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
> will hold the same information. The meta_info column contains the
> serialized PaneInfo, WindowInfo and our SystemTimestamp =
> currentTimeMilliseconds.
> >
> > We have 3 windows:
> >   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
> systemTimestamp: 1559396670577
> >   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
> systemTimestamp: 1559396670670
> >   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
> systemTimestamp: 1559396670533
> >
> >
> > window A contains: 18 records
> > window B contains: 46 records
> > window C contains: 3  records
> >
> > If you pay attention to the A, B, C windowInfo from above, the
> `systemTimestamp` field reflect an incorrect order of processing, and the
> enrichment was executed as C -> A ->  B, corrupting all the messages for
> this given user.
> >
> > For all 3 windows the serialized PaneInfo was set by the runner to
> ON_TIME:
> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
> onTimeIndex=0}"
> >
> > Any idea why would the windows be triggered out of order?
> >
> > --
> >
> > JC
> >
>


-- 

JC

Reply via email to