An extremely convenient point is that you should be able to use the same pipeline (albeit with a different data source) to process bounded (backfill/reprocessing) and unbounded datasets (live).
On Wed, May 3, 2017 at 12:55 AM, Lars BK <[email protected]> wrote: > Thanks for your input and sorry for the late reply. > > Lukasz, you may be right that running the reprocessing as a batch job will > be better and faster. I'm still experimenting with approach 3 where I > publish all messages and then start the job to let the watermark progress > through the data. It seems to be working fairly well right now, but I'm not > sure that the "somewhat ordered" data I send is "ordered enough". (I can > send data ordered by date, but within each date I can give no guaranees) > > Thomas, I had not thought of that, thanks. I like the idea, sounds like it > will handle the merge between archive and live data automatically which > would be very nice. > > And Ankur, your case sounds similar. I'm starting to lean towards doing > batch jobs for reprocessing too. > > I am going to keep experimenting with different approaches (until I have > to move on), and I'll do my best to update here with my findings later. > > > Lars > > On Mon, May 1, 2017 at 6:51 PM Ankur Chauhan <[email protected]> wrote: > >> I have sort of a similar usecase when dealing with failed / cancelled / >> broken streaming pipelines. >> We have an operator that continuously monitors the min-watermark of the >> pipeline and when it detects that the watermark is not advancing for more >> than some threshold. We start a new pipeline and initiate a "patcher" batch >> dataflow that reads the event backups over the possibly broken time range >> (+/- 1 hour). >> It works out well but has the overhead of having to build out an external >> operator process that can detect when to do the batch dataflow process. >> >> Sent from my iPhone >> >> On May 1, 2017, at 09:37, Thomas Groh <[email protected]> wrote: >> >> You should also be able to simply add a Bounded Read from the backup data >> source to your pipeline and flatten it with your Pubsub topic. Because all >> of the elements produced by both the bounded and unbounded sources will >> have consistent timestamps, when you run the pipeline the watermark will be >> held until all of the data is read from the bounded sources. Once this is >> done, your pipeline can continue processing only elements from the PubSub >> source. If you don't want the backlog and the current processing to occur >> in the same pipeline, running the same pipeline but just reading from the >> archival data should be sufficient (all of the processing would be >> identical, just the source would need to change). >> >> If you read from both the "live" and "archival" sources within the same >> pipeline, you will need to use additional machines so the backlog can be >> processed promptly if you use a watermark based trigger; watermarks will be >> held until the bounded source is fully processed. >> >> On Mon, May 1, 2017 at 9:29 AM, Lars BK <[email protected]> wrote: >> >>> I did not see Lukasz reply before I posted, and I will have to read it a >>> bit later! >>> >>> man. 1. mai 2017 kl. 18.28 skrev Lars BK <[email protected]>: >>> >>>> Yes, precisely. >>>> >>>> I think that could work, yes. What you are suggesting sounds like idea >>>> 2) in my original question. >>>> >>>> My main concern is that I would have to allow a great deal of lateness >>>> and that old windows would consume too much memory. Whether it works in my >>>> case or not I don't know yet as I haven't tested it. >>>> >>>> What if I had to process even older data? Could I handle any "oldness" >>>> of data by increasing the allowed lateness and throwing machines at the >>>> problem to hold all the old windows in memory while the backlog is >>>> processed? If so, great! But I would have to dial the allowed lateness back >>>> down when the processing has caught up with the present. >>>> >>>> Is there some intended way of handling reprocessing like this? Maybe >>>> not? Perhaps it is more of a Pubsub and Dataflow question than a Beam >>>> question when it comes down to it. >>>> >>>> >>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <[email protected] >>>> >: >>>> >>>>> OK, so the messages are "re-publish" on the topic, with the same >>>>> timestamp as >>>>> the original and consume again by the pipeline. >>>>> >>>>> Maybe, you can play with the allowed lateness and late firings ? >>>>> >>>>> Something like: >>>>> >>>>> Window.into(FixedWindows.of(Duration.minutes(xx))) >>>>> .triggering(AfterWatermark.pastEndOfWindow() >>>>> .withEarlyFirings(AfterProcessingTime. >>>>> pastFirstElementInPane() >>>>> .plusDelayOf(FIVE_MINUTES)) >>>>> .withLateFirings(AfterProcessingTime. >>>>> pastFirstElementInPane() >>>>> .plusDelayOf(TEN_MINUTES))) >>>>> .withAllowedLateness(Duration.minutes() >>>>> .accumulatingFiredPanes()) >>>>> >>>>> Thoughts ? >>>>> >>>>> Regards >>>>> JB >>>>> >>>>> On 05/01/2017 05:12 PM, Lars BK wrote: >>>>> > Hi Jean-Baptiste, >>>>> > >>>>> > I think the key point in my case is that I have to process or >>>>> reprocess "old" >>>>> > messages. That is, messages that are late because they are streamed >>>>> from an >>>>> > archive file and are older than the allowed lateness in the pipeline. >>>>> > >>>>> > In the case I described the messages had already been processed once >>>>> and no >>>>> > longer in the topic, so they had to be sent and processed again. But >>>>> it might as >>>>> > well have been that I had received a backfill of data that >>>>> absolutely needs to >>>>> > be processed regardless of it being later than the allowed lateness >>>>> with respect >>>>> > to present time. >>>>> > >>>>> > So when I write this now it really sounds like I either need to >>>>> allow more >>>>> > lateness or somehow rewind the watermark! >>>>> > >>>>> > Lars >>>>> > >>>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré < >>>>> [email protected] >>>>> > <mailto:[email protected]>>: >>>>> > >>>>> > Hi Lars, >>>>> > >>>>> > interesting use case indeed ;) >>>>> > >>>>> > Just to understand: if possible, you don't want to re-consume >>>>> the messages from >>>>> > the PubSub topic right ? So, you want to "hold" the PCollections >>>>> for late data >>>>> > processing ? >>>>> > >>>>> > Regards >>>>> > JB >>>>> > >>>>> > On 05/01/2017 04:15 PM, Lars BK wrote: >>>>> > > Hi, >>>>> > > >>>>> > > Is there a preferred way of approaching reprocessing historic >>>>> data with >>>>> > > streaming jobs? >>>>> > > >>>>> > > I want to pose this as a general question, but I'm working >>>>> with Pubsub and >>>>> > > Dataflow specifically. I am a fan of the idea of >>>>> replaying/fast forwarding >>>>> > > through historic data to reproduce results (as you perhaps >>>>> would with Kafka), >>>>> > > but I'm having a hard time unifying this way of thinking with >>>>> the concepts of >>>>> > > watermarks and late data in Beam. I'm not sure how to best >>>>> mimic this with the >>>>> > > tools I'm using, or if there is a better way. >>>>> > > >>>>> > > If there is a previous discussion about this I might have >>>>> missed (and I'm >>>>> > > guessing there is), please direct me to it! >>>>> > > >>>>> > > >>>>> > > The use case: >>>>> > > >>>>> > > Suppose I discover a bug in a streaming job with event time >>>>> windows and an >>>>> > > allowed lateness of 7 days, and that I subsequently have to >>>>> reprocess all the >>>>> > > data for the past month. Let us also assume that I have an >>>>> archive of my >>>>> > source >>>>> > > data (in my case in Google cloud storage) and that I can >>>>> republish it all >>>>> > to the >>>>> > > message queue I'm using. >>>>> > > >>>>> > > Some ideas that may or may not work I would love to get your >>>>> thoughts on: >>>>> > > >>>>> > > 1) Start a new instance of the job that reads from a separate >>>>> source to >>>>> > which I >>>>> > > republish all messages. This shouldn't work because 14 days of >>>>> my data is >>>>> > later >>>>> > > than the allowed limit, buy the remaining 7 days should be >>>>> reprocessed as >>>>> > intended. >>>>> > > >>>>> > > 2) The same as 1), but with allowed lateness of one month. >>>>> When the job is >>>>> > > caught up, the lateness can be adjusted back to 7 days. I am >>>>> afraid this >>>>> > > approach may consume too much memory since I'm letting a whole >>>>> month of >>>>> > windows >>>>> > > remain in memory. Also I wouldn't get the same triggering >>>>> behaviour as in the >>>>> > > original job since most or all of the data is late with >>>>> respect to the >>>>> > > watermark, which I assume is near real time when the historic >>>>> data enters the >>>>> > > pipeline. >>>>> > > >>>>> > > 3) The same as 1), but with the republishing first and only >>>>> starting the >>>>> > new job >>>>> > > when all messages are already waiting in the queue. The >>>>> watermark should then >>>>> > > start one month back in time and only catch up with the >>>>> present once all the >>>>> > > data is reprocessed, yielding no late data. (Experiments I've >>>>> done with this >>>>> > > approach produce somewhat unexpected results where early panes >>>>> that are older >>>>> > > than 7 days appear to be both the first and the last firing >>>>> from their >>>>> > > respective windows.) Early firings triggered by processing >>>>> time would probably >>>>> > > differ by the results should be the same? This approach also >>>>> feels a bit >>>>> > awkward >>>>> > > as it requires more orchestration. >>>>> > > >>>>> > > 4) Batch process the archived data instead and start a >>>>> streaming job in >>>>> > > parallel. Would this in a sense be a more honest approach >>>>> since I'm actually >>>>> > > reprocessing batches of archived data? The triggering >>>>> behaviour in the >>>>> > streaming >>>>> > > version of the job would not apply in batch, and I would want >>>>> to avoid >>>>> > stitching >>>>> > > together results from two jobs if I can. >>>>> > > >>>>> > > >>>>> > > These are the approaches I've thought of currently, and any >>>>> input is much >>>>> > > appreciated. Have any of you faced similar situations, and >>>>> how did you >>>>> > solve them? >>>>> > > >>>>> > > >>>>> > > Regards, >>>>> > > Lars >>>>> > > >>>>> > > >>>>> > >>>>> > -- >>>>> > Jean-Baptiste Onofré >>>>> > [email protected] <mailto:[email protected]> >>>>> > http://blog.nanthrax.net >>>>> > Talend - http://www.talend.com >>>>> > >>>>> >>>>> -- >>>>> Jean-Baptiste Onofré >>>>> [email protected] >>>>> http://blog.nanthrax.net >>>>> Talend - http://www.talend.com >>>>> >>>> >>
