An extremely convenient point is that you should be able to use the same
pipeline (albeit with a different data source) to process bounded
(backfill/reprocessing) and unbounded datasets (live).

On Wed, May 3, 2017 at 12:55 AM, Lars BK <[email protected]> wrote:

> Thanks for your input and sorry for the late reply.
>
> Lukasz, you may be right that running the reprocessing as a batch job will
> be better and faster. I'm still experimenting with approach 3 where I
> publish all messages and then start the job to let the watermark progress
> through the data. It seems to be working fairly well right now, but I'm not
> sure that the "somewhat ordered" data I send is "ordered enough". (I can
> send data ordered by date, but within each date I can give no guaranees)
>
> Thomas, I had not thought of that, thanks. I like the idea, sounds like it
> will handle the merge between archive and live data automatically which
> would be very nice.
>
> And Ankur, your case sounds similar. I'm starting to lean towards doing
> batch jobs for reprocessing too.
>
> I am going to keep experimenting with different approaches (until I have
> to move on), and I'll do my best to update here with my findings later.
>
>
> Lars
>
> On Mon, May 1, 2017 at 6:51 PM Ankur Chauhan <[email protected]> wrote:
>
>> I have sort of a similar usecase when dealing with failed / cancelled /
>> broken streaming pipelines.
>> We have an operator that continuously monitors the min-watermark of the
>> pipeline and when it detects that the watermark is not advancing for more
>> than some threshold. We start a new pipeline and initiate a "patcher" batch
>> dataflow that reads the event backups over the possibly broken time range
>> (+/- 1 hour).
>> It works out well but has the overhead of having to build out an external
>> operator process that can detect when to do the batch dataflow process.
>>
>> Sent from my iPhone
>>
>> On May 1, 2017, at 09:37, Thomas Groh <[email protected]> wrote:
>>
>> You should also be able to simply add a Bounded Read from the backup data
>> source to your pipeline and flatten it with your Pubsub topic. Because all
>> of the elements produced by both the bounded and unbounded sources will
>> have consistent timestamps, when you run the pipeline the watermark will be
>> held until all of the data is read from the bounded sources. Once this is
>> done, your pipeline can continue processing only elements from the PubSub
>> source. If you don't want the backlog and the current processing to occur
>> in the same pipeline, running the same pipeline but just reading from the
>> archival data should be sufficient (all of the processing would be
>> identical, just the source would need to change).
>>
>> If you read from both the "live" and "archival" sources within the same
>> pipeline, you will need to use additional machines so the backlog can be
>> processed promptly if you use a watermark based trigger; watermarks will be
>> held until the bounded source is fully processed.
>>
>> On Mon, May 1, 2017 at 9:29 AM, Lars BK <[email protected]> wrote:
>>
>>> I did not see Lukasz reply before I posted, and I will have to read it a
>>> bit later!
>>>
>>> man. 1. mai 2017 kl. 18.28 skrev Lars BK <[email protected]>:
>>>
>>>> Yes, precisely.
>>>>
>>>> I think that could work, yes. What you are suggesting sounds like idea
>>>> 2) in my original question.
>>>>
>>>> My main concern is that I would have to allow a great deal of lateness
>>>> and that old windows would consume too much memory. Whether it works in my
>>>> case or not I don't know yet as I haven't tested it.
>>>>
>>>> What if I had to process even older data? Could I handle any "oldness"
>>>> of data by increasing the allowed lateness and throwing machines at the
>>>> problem to hold all the old windows in memory while the backlog is
>>>> processed? If so, great! But I would have to dial the allowed lateness back
>>>> down when the processing has caught up with the present.
>>>>
>>>> Is there some intended way of handling reprocessing like this? Maybe
>>>> not? Perhaps it is more of a Pubsub and Dataflow question than a Beam
>>>> question when it comes down to it.
>>>>
>>>>
>>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <[email protected]
>>>> >:
>>>>
>>>>> OK, so the messages are "re-publish" on the topic, with the same
>>>>> timestamp as
>>>>> the original and consume again by the pipeline.
>>>>>
>>>>> Maybe, you can play with the allowed lateness and late firings ?
>>>>>
>>>>> Something like:
>>>>>
>>>>>            Window.into(FixedWindows.of(Duration.minutes(xx)))
>>>>>                .triggering(AfterWatermark.pastEndOfWindow()
>>>>>                    .withEarlyFirings(AfterProcessingTime.
>>>>> pastFirstElementInPane()
>>>>>                        .plusDelayOf(FIVE_MINUTES))
>>>>>                    .withLateFirings(AfterProcessingTime.
>>>>> pastFirstElementInPane()
>>>>>                        .plusDelayOf(TEN_MINUTES)))
>>>>>                .withAllowedLateness(Duration.minutes()
>>>>>                .accumulatingFiredPanes())
>>>>>
>>>>> Thoughts ?
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 05/01/2017 05:12 PM, Lars BK wrote:
>>>>> > Hi Jean-Baptiste,
>>>>> >
>>>>> > I think the key point in my case is that I have to process or
>>>>> reprocess "old"
>>>>> > messages. That is, messages that are late because they are streamed
>>>>> from an
>>>>> > archive file and are older than the allowed lateness in the pipeline.
>>>>> >
>>>>> > In the case I described the messages had already been processed once
>>>>> and no
>>>>> > longer in the topic, so they had to be sent and processed again. But
>>>>> it might as
>>>>> > well have been that I had received a backfill of data that
>>>>> absolutely needs to
>>>>> > be processed regardless of it being later than the allowed lateness
>>>>> with respect
>>>>> > to present time.
>>>>> >
>>>>> > So when I write this now it really sounds like I either need to
>>>>> allow more
>>>>> > lateness or somehow rewind the watermark!
>>>>> >
>>>>> > Lars
>>>>> >
>>>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <
>>>>> [email protected]
>>>>> > <mailto:[email protected]>>:
>>>>> >
>>>>> >     Hi Lars,
>>>>> >
>>>>> >     interesting use case indeed ;)
>>>>> >
>>>>> >     Just to understand: if possible, you don't want to re-consume
>>>>> the messages from
>>>>> >     the PubSub topic right ? So, you want to "hold" the PCollections
>>>>> for late data
>>>>> >     processing ?
>>>>> >
>>>>> >     Regards
>>>>> >     JB
>>>>> >
>>>>> >     On 05/01/2017 04:15 PM, Lars BK wrote:
>>>>> >     > Hi,
>>>>> >     >
>>>>> >     > Is there a preferred way of approaching reprocessing historic
>>>>> data with
>>>>> >     > streaming jobs?
>>>>> >     >
>>>>> >     > I want to pose this as a general question, but I'm working
>>>>> with Pubsub and
>>>>> >     > Dataflow specifically. I am a fan of the idea of
>>>>> replaying/fast forwarding
>>>>> >     > through historic data to reproduce results (as you perhaps
>>>>> would with Kafka),
>>>>> >     > but I'm having a hard time unifying this way of thinking with
>>>>> the concepts of
>>>>> >     > watermarks and late data in Beam. I'm not sure how to best
>>>>> mimic this with the
>>>>> >     > tools I'm using, or if there is a better way.
>>>>> >     >
>>>>> >     > If there is a previous discussion about this I might have
>>>>> missed (and I'm
>>>>> >     > guessing there is), please direct me to it!
>>>>> >     >
>>>>> >     >
>>>>> >     > The use case:
>>>>> >     >
>>>>> >     > Suppose I discover a bug in a streaming job with event time
>>>>> windows and an
>>>>> >     > allowed lateness of 7 days, and that I subsequently have to
>>>>> reprocess all the
>>>>> >     > data for the past month. Let us also assume that I have an
>>>>> archive of my
>>>>> >     source
>>>>> >     > data (in my case in Google cloud storage) and that I can
>>>>> republish it all
>>>>> >     to the
>>>>> >     > message queue I'm using.
>>>>> >     >
>>>>> >     > Some ideas that may or may not work I would love to get your
>>>>> thoughts on:
>>>>> >     >
>>>>> >     > 1) Start a new instance of the job that reads from a separate
>>>>> source to
>>>>> >     which I
>>>>> >     > republish all messages. This shouldn't work because 14 days of
>>>>> my data is
>>>>> >     later
>>>>> >     > than the allowed limit, buy the remaining 7 days should be
>>>>> reprocessed as
>>>>> >     intended.
>>>>> >     >
>>>>> >     > 2) The same as 1), but with allowed lateness of one month.
>>>>> When the job is
>>>>> >     > caught up, the lateness can be adjusted back to 7 days. I am
>>>>> afraid this
>>>>> >     > approach may consume too much memory since I'm letting a whole
>>>>> month of
>>>>> >     windows
>>>>> >     > remain in memory. Also I wouldn't get the same triggering
>>>>> behaviour as in the
>>>>> >     > original job since most or all of the data is late with
>>>>> respect to the
>>>>> >     > watermark, which I assume is near real time when the historic
>>>>> data enters the
>>>>> >     > pipeline.
>>>>> >     >
>>>>> >     > 3) The same as 1), but with the republishing first and only
>>>>> starting the
>>>>> >     new job
>>>>> >     > when all messages are already waiting in the queue. The
>>>>> watermark should then
>>>>> >     > start one month back in time and only catch up with the
>>>>> present once all the
>>>>> >     > data is reprocessed, yielding no late data. (Experiments I've
>>>>> done with this
>>>>> >     > approach produce somewhat unexpected results where early panes
>>>>> that are older
>>>>> >     > than 7 days appear to be both the first and the last firing
>>>>> from their
>>>>> >     > respective windows.) Early firings triggered by processing
>>>>> time would probably
>>>>> >     > differ by the results should be the same? This approach also
>>>>> feels a bit
>>>>> >     awkward
>>>>> >     > as it requires more orchestration.
>>>>> >     >
>>>>> >     > 4) Batch process the archived data instead and start a
>>>>> streaming job in
>>>>> >     > parallel. Would this in a sense be a more honest approach
>>>>> since I'm actually
>>>>> >     > reprocessing batches of archived data? The triggering
>>>>> behaviour in the
>>>>> >     streaming
>>>>> >     > version of the job would not apply in batch, and I would want
>>>>> to avoid
>>>>> >     stitching
>>>>> >     > together results from two jobs if I can.
>>>>> >     >
>>>>> >     >
>>>>> >     > These are the approaches I've thought of currently, and any
>>>>> input is much
>>>>> >     > appreciated.  Have any of you faced similar situations, and
>>>>> how did you
>>>>> >     solve them?
>>>>> >     >
>>>>> >     >
>>>>> >     > Regards,
>>>>> >     > Lars
>>>>> >     >
>>>>> >     >
>>>>> >
>>>>> >     --
>>>>> >     Jean-Baptiste Onofré
>>>>> >     [email protected] <mailto:[email protected]>
>>>>> >     http://blog.nanthrax.net
>>>>> >     Talend - http://www.talend.com
>>>>> >
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> [email protected]
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>
>>

Reply via email to