I have sort of a similar usecase when dealing with failed / cancelled / broken 
streaming pipelines.
We have an operator that continuously monitors the min-watermark of the 
pipeline and when it detects that the watermark is not advancing for more than 
some threshold. We start a new pipeline and initiate a "patcher" batch dataflow 
that reads the event backups over the possibly broken time range (+/- 1 hour).
It works out well but has the overhead of having to build out an external 
operator process that can detect when to do the batch dataflow process. 

Sent from my iPhone

> On May 1, 2017, at 09:37, Thomas Groh <[email protected]> wrote:
> 
> You should also be able to simply add a Bounded Read from the backup data 
> source to your pipeline and flatten it with your Pubsub topic. Because all of 
> the elements produced by both the bounded and unbounded sources will have 
> consistent timestamps, when you run the pipeline the watermark will be held 
> until all of the data is read from the bounded sources. Once this is done, 
> your pipeline can continue processing only elements from the PubSub source. 
> If you don't want the backlog and the current processing to occur in the same 
> pipeline, running the same pipeline but just reading from the archival data 
> should be sufficient (all of the processing would be identical, just the 
> source would need to change).
> 
> If you read from both the "live" and "archival" sources within the same 
> pipeline, you will need to use additional machines so the backlog can be 
> processed promptly if you use a watermark based trigger; watermarks will be 
> held until the bounded source is fully processed.
> 
>> On Mon, May 1, 2017 at 9:29 AM, Lars BK <[email protected]> wrote:
>> I did not see Lukasz reply before I posted, and I will have to read it a bit 
>> later!
>> 
>>> man. 1. mai 2017 kl. 18.28 skrev Lars BK <[email protected]>:
>>> Yes, precisely. 
>>> 
>>> I think that could work, yes. What you are suggesting sounds like idea 2) 
>>> in my original question.
>>> 
>>> My main concern is that I would have to allow a great deal of lateness and 
>>> that old windows would consume too much memory. Whether it works in my case 
>>> or not I don't know yet as I haven't tested it. 
>>> 
>>> What if I had to process even older data? Could I handle any "oldness" of 
>>> data by increasing the allowed lateness and throwing machines at the 
>>> problem to hold all the old windows in memory while the backlog is 
>>> processed? If so, great! But I would have to dial the allowed lateness back 
>>> down when the processing has caught up with the present. 
>>> 
>>> Is there some intended way of handling reprocessing like this? Maybe not? 
>>> Perhaps it is more of a Pubsub and Dataflow question than a Beam question 
>>> when it comes down to it. 
>>> 
>>> 
>>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <[email protected]>:
>>>> OK, so the messages are "re-publish" on the topic, with the same timestamp 
>>>> as
>>>> the original and consume again by the pipeline.
>>>> 
>>>> Maybe, you can play with the allowed lateness and late firings ?
>>>> 
>>>> Something like:
>>>> 
>>>>            Window.into(FixedWindows.of(Duration.minutes(xx)))
>>>>                .triggering(AfterWatermark.pastEndOfWindow()
>>>>                    
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                        .plusDelayOf(FIVE_MINUTES))
>>>>                    
>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                        .plusDelayOf(TEN_MINUTES)))
>>>>                .withAllowedLateness(Duration.minutes()
>>>>                .accumulatingFiredPanes())
>>>> 
>>>> Thoughts ?
>>>> 
>>>> Regards
>>>> JB
>>>> 
>>>> On 05/01/2017 05:12 PM, Lars BK wrote:
>>>> > Hi Jean-Baptiste,
>>>> >
>>>> > I think the key point in my case is that I have to process or reprocess 
>>>> > "old"
>>>> > messages. That is, messages that are late because they are streamed from 
>>>> > an
>>>> > archive file and are older than the allowed lateness in the pipeline.
>>>> >
>>>> > In the case I described the messages had already been processed once and 
>>>> > no
>>>> > longer in the topic, so they had to be sent and processed again. But it 
>>>> > might as
>>>> > well have been that I had received a backfill of data that absolutely 
>>>> > needs to
>>>> > be processed regardless of it being later than the allowed lateness with 
>>>> > respect
>>>> > to present time.
>>>> >
>>>> > So when I write this now it really sounds like I either need to allow 
>>>> > more
>>>> > lateness or somehow rewind the watermark!
>>>> >
>>>> > Lars
>>>> >
>>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <[email protected]
>>>> > <mailto:[email protected]>>:
>>>> >
>>>> >     Hi Lars,
>>>> >
>>>> >     interesting use case indeed ;)
>>>> >
>>>> >     Just to understand: if possible, you don't want to re-consume the 
>>>> > messages from
>>>> >     the PubSub topic right ? So, you want to "hold" the PCollections for 
>>>> > late data
>>>> >     processing ?
>>>> >
>>>> >     Regards
>>>> >     JB
>>>> >
>>>> >     On 05/01/2017 04:15 PM, Lars BK wrote:
>>>> >     > Hi,
>>>> >     >
>>>> >     > Is there a preferred way of approaching reprocessing historic data 
>>>> > with
>>>> >     > streaming jobs?
>>>> >     >
>>>> >     > I want to pose this as a general question, but I'm working with 
>>>> > Pubsub and
>>>> >     > Dataflow specifically. I am a fan of the idea of replaying/fast 
>>>> > forwarding
>>>> >     > through historic data to reproduce results (as you perhaps would 
>>>> > with Kafka),
>>>> >     > but I'm having a hard time unifying this way of thinking with the 
>>>> > concepts of
>>>> >     > watermarks and late data in Beam. I'm not sure how to best mimic 
>>>> > this with the
>>>> >     > tools I'm using, or if there is a better way.
>>>> >     >
>>>> >     > If there is a previous discussion about this I might have missed 
>>>> > (and I'm
>>>> >     > guessing there is), please direct me to it!
>>>> >     >
>>>> >     >
>>>> >     > The use case:
>>>> >     >
>>>> >     > Suppose I discover a bug in a streaming job with event time 
>>>> > windows and an
>>>> >     > allowed lateness of 7 days, and that I subsequently have to 
>>>> > reprocess all the
>>>> >     > data for the past month. Let us also assume that I have an archive 
>>>> > of my
>>>> >     source
>>>> >     > data (in my case in Google cloud storage) and that I can republish 
>>>> > it all
>>>> >     to the
>>>> >     > message queue I'm using.
>>>> >     >
>>>> >     > Some ideas that may or may not work I would love to get your 
>>>> > thoughts on:
>>>> >     >
>>>> >     > 1) Start a new instance of the job that reads from a separate 
>>>> > source to
>>>> >     which I
>>>> >     > republish all messages. This shouldn't work because 14 days of my 
>>>> > data is
>>>> >     later
>>>> >     > than the allowed limit, buy the remaining 7 days should be 
>>>> > reprocessed as
>>>> >     intended.
>>>> >     >
>>>> >     > 2) The same as 1), but with allowed lateness of one month. When 
>>>> > the job is
>>>> >     > caught up, the lateness can be adjusted back to 7 days. I am 
>>>> > afraid this
>>>> >     > approach may consume too much memory since I'm letting a whole 
>>>> > month of
>>>> >     windows
>>>> >     > remain in memory. Also I wouldn't get the same triggering 
>>>> > behaviour as in the
>>>> >     > original job since most or all of the data is late with respect to 
>>>> > the
>>>> >     > watermark, which I assume is near real time when the historic data 
>>>> > enters the
>>>> >     > pipeline.
>>>> >     >
>>>> >     > 3) The same as 1), but with the republishing first and only 
>>>> > starting the
>>>> >     new job
>>>> >     > when all messages are already waiting in the queue. The watermark 
>>>> > should then
>>>> >     > start one month back in time and only catch up with the present 
>>>> > once all the
>>>> >     > data is reprocessed, yielding no late data. (Experiments I've done 
>>>> > with this
>>>> >     > approach produce somewhat unexpected results where early panes 
>>>> > that are older
>>>> >     > than 7 days appear to be both the first and the last firing from 
>>>> > their
>>>> >     > respective windows.) Early firings triggered by processing time 
>>>> > would probably
>>>> >     > differ by the results should be the same? This approach also feels 
>>>> > a bit
>>>> >     awkward
>>>> >     > as it requires more orchestration.
>>>> >     >
>>>> >     > 4) Batch process the archived data instead and start a streaming 
>>>> > job in
>>>> >     > parallel. Would this in a sense be a more honest approach since 
>>>> > I'm actually
>>>> >     > reprocessing batches of archived data? The triggering behaviour in 
>>>> > the
>>>> >     streaming
>>>> >     > version of the job would not apply in batch, and I would want to 
>>>> > avoid
>>>> >     stitching
>>>> >     > together results from two jobs if I can.
>>>> >     >
>>>> >     >
>>>> >     > These are the approaches I've thought of currently, and any input 
>>>> > is much
>>>> >     > appreciated.  Have any of you faced similar situations, and how 
>>>> > did you
>>>> >     solve them?
>>>> >     >
>>>> >     >
>>>> >     > Regards,
>>>> >     > Lars
>>>> >     >
>>>> >     >
>>>> >
>>>> >     --
>>>> >     Jean-Baptiste Onofré
>>>> >     [email protected] <mailto:[email protected]>
>>>> >     http://blog.nanthrax.net
>>>> >     Talend - http://www.talend.com
>>>> >
>>>> 
>>>> --
>>>> Jean-Baptiste Onofré
>>>> [email protected]
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
> 

Reply via email to