I have sort of a similar usecase when dealing with failed / cancelled / broken streaming pipelines. We have an operator that continuously monitors the min-watermark of the pipeline and when it detects that the watermark is not advancing for more than some threshold. We start a new pipeline and initiate a "patcher" batch dataflow that reads the event backups over the possibly broken time range (+/- 1 hour). It works out well but has the overhead of having to build out an external operator process that can detect when to do the batch dataflow process.
Sent from my iPhone > On May 1, 2017, at 09:37, Thomas Groh <[email protected]> wrote: > > You should also be able to simply add a Bounded Read from the backup data > source to your pipeline and flatten it with your Pubsub topic. Because all of > the elements produced by both the bounded and unbounded sources will have > consistent timestamps, when you run the pipeline the watermark will be held > until all of the data is read from the bounded sources. Once this is done, > your pipeline can continue processing only elements from the PubSub source. > If you don't want the backlog and the current processing to occur in the same > pipeline, running the same pipeline but just reading from the archival data > should be sufficient (all of the processing would be identical, just the > source would need to change). > > If you read from both the "live" and "archival" sources within the same > pipeline, you will need to use additional machines so the backlog can be > processed promptly if you use a watermark based trigger; watermarks will be > held until the bounded source is fully processed. > >> On Mon, May 1, 2017 at 9:29 AM, Lars BK <[email protected]> wrote: >> I did not see Lukasz reply before I posted, and I will have to read it a bit >> later! >> >>> man. 1. mai 2017 kl. 18.28 skrev Lars BK <[email protected]>: >>> Yes, precisely. >>> >>> I think that could work, yes. What you are suggesting sounds like idea 2) >>> in my original question. >>> >>> My main concern is that I would have to allow a great deal of lateness and >>> that old windows would consume too much memory. Whether it works in my case >>> or not I don't know yet as I haven't tested it. >>> >>> What if I had to process even older data? Could I handle any "oldness" of >>> data by increasing the allowed lateness and throwing machines at the >>> problem to hold all the old windows in memory while the backlog is >>> processed? If so, great! But I would have to dial the allowed lateness back >>> down when the processing has caught up with the present. >>> >>> Is there some intended way of handling reprocessing like this? Maybe not? >>> Perhaps it is more of a Pubsub and Dataflow question than a Beam question >>> when it comes down to it. >>> >>> >>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <[email protected]>: >>>> OK, so the messages are "re-publish" on the topic, with the same timestamp >>>> as >>>> the original and consume again by the pipeline. >>>> >>>> Maybe, you can play with the allowed lateness and late firings ? >>>> >>>> Something like: >>>> >>>> Window.into(FixedWindows.of(Duration.minutes(xx))) >>>> .triggering(AfterWatermark.pastEndOfWindow() >>>> >>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>> .plusDelayOf(FIVE_MINUTES)) >>>> >>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane() >>>> .plusDelayOf(TEN_MINUTES))) >>>> .withAllowedLateness(Duration.minutes() >>>> .accumulatingFiredPanes()) >>>> >>>> Thoughts ? >>>> >>>> Regards >>>> JB >>>> >>>> On 05/01/2017 05:12 PM, Lars BK wrote: >>>> > Hi Jean-Baptiste, >>>> > >>>> > I think the key point in my case is that I have to process or reprocess >>>> > "old" >>>> > messages. That is, messages that are late because they are streamed from >>>> > an >>>> > archive file and are older than the allowed lateness in the pipeline. >>>> > >>>> > In the case I described the messages had already been processed once and >>>> > no >>>> > longer in the topic, so they had to be sent and processed again. But it >>>> > might as >>>> > well have been that I had received a backfill of data that absolutely >>>> > needs to >>>> > be processed regardless of it being later than the allowed lateness with >>>> > respect >>>> > to present time. >>>> > >>>> > So when I write this now it really sounds like I either need to allow >>>> > more >>>> > lateness or somehow rewind the watermark! >>>> > >>>> > Lars >>>> > >>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <[email protected] >>>> > <mailto:[email protected]>>: >>>> > >>>> > Hi Lars, >>>> > >>>> > interesting use case indeed ;) >>>> > >>>> > Just to understand: if possible, you don't want to re-consume the >>>> > messages from >>>> > the PubSub topic right ? So, you want to "hold" the PCollections for >>>> > late data >>>> > processing ? >>>> > >>>> > Regards >>>> > JB >>>> > >>>> > On 05/01/2017 04:15 PM, Lars BK wrote: >>>> > > Hi, >>>> > > >>>> > > Is there a preferred way of approaching reprocessing historic data >>>> > with >>>> > > streaming jobs? >>>> > > >>>> > > I want to pose this as a general question, but I'm working with >>>> > Pubsub and >>>> > > Dataflow specifically. I am a fan of the idea of replaying/fast >>>> > forwarding >>>> > > through historic data to reproduce results (as you perhaps would >>>> > with Kafka), >>>> > > but I'm having a hard time unifying this way of thinking with the >>>> > concepts of >>>> > > watermarks and late data in Beam. I'm not sure how to best mimic >>>> > this with the >>>> > > tools I'm using, or if there is a better way. >>>> > > >>>> > > If there is a previous discussion about this I might have missed >>>> > (and I'm >>>> > > guessing there is), please direct me to it! >>>> > > >>>> > > >>>> > > The use case: >>>> > > >>>> > > Suppose I discover a bug in a streaming job with event time >>>> > windows and an >>>> > > allowed lateness of 7 days, and that I subsequently have to >>>> > reprocess all the >>>> > > data for the past month. Let us also assume that I have an archive >>>> > of my >>>> > source >>>> > > data (in my case in Google cloud storage) and that I can republish >>>> > it all >>>> > to the >>>> > > message queue I'm using. >>>> > > >>>> > > Some ideas that may or may not work I would love to get your >>>> > thoughts on: >>>> > > >>>> > > 1) Start a new instance of the job that reads from a separate >>>> > source to >>>> > which I >>>> > > republish all messages. This shouldn't work because 14 days of my >>>> > data is >>>> > later >>>> > > than the allowed limit, buy the remaining 7 days should be >>>> > reprocessed as >>>> > intended. >>>> > > >>>> > > 2) The same as 1), but with allowed lateness of one month. When >>>> > the job is >>>> > > caught up, the lateness can be adjusted back to 7 days. I am >>>> > afraid this >>>> > > approach may consume too much memory since I'm letting a whole >>>> > month of >>>> > windows >>>> > > remain in memory. Also I wouldn't get the same triggering >>>> > behaviour as in the >>>> > > original job since most or all of the data is late with respect to >>>> > the >>>> > > watermark, which I assume is near real time when the historic data >>>> > enters the >>>> > > pipeline. >>>> > > >>>> > > 3) The same as 1), but with the republishing first and only >>>> > starting the >>>> > new job >>>> > > when all messages are already waiting in the queue. The watermark >>>> > should then >>>> > > start one month back in time and only catch up with the present >>>> > once all the >>>> > > data is reprocessed, yielding no late data. (Experiments I've done >>>> > with this >>>> > > approach produce somewhat unexpected results where early panes >>>> > that are older >>>> > > than 7 days appear to be both the first and the last firing from >>>> > their >>>> > > respective windows.) Early firings triggered by processing time >>>> > would probably >>>> > > differ by the results should be the same? This approach also feels >>>> > a bit >>>> > awkward >>>> > > as it requires more orchestration. >>>> > > >>>> > > 4) Batch process the archived data instead and start a streaming >>>> > job in >>>> > > parallel. Would this in a sense be a more honest approach since >>>> > I'm actually >>>> > > reprocessing batches of archived data? The triggering behaviour in >>>> > the >>>> > streaming >>>> > > version of the job would not apply in batch, and I would want to >>>> > avoid >>>> > stitching >>>> > > together results from two jobs if I can. >>>> > > >>>> > > >>>> > > These are the approaches I've thought of currently, and any input >>>> > is much >>>> > > appreciated. Have any of you faced similar situations, and how >>>> > did you >>>> > solve them? >>>> > > >>>> > > >>>> > > Regards, >>>> > > Lars >>>> > > >>>> > > >>>> > >>>> > -- >>>> > Jean-Baptiste Onofré >>>> > [email protected] <mailto:[email protected]> >>>> > http://blog.nanthrax.net >>>> > Talend - http://www.talend.com >>>> > >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> [email protected] >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >
