Yes, precisely.

I think that could work, yes. What you are suggesting sounds like idea 2)
in my original question.

My main concern is that I would have to allow a great deal of lateness and
that old windows would consume too much memory. Whether it works in my case
or not I don't know yet as I haven't tested it.

What if I had to process even older data? Could I handle any "oldness" of
data by increasing the allowed lateness and throwing machines at the
problem to hold all the old windows in memory while the backlog is
processed? If so, great! But I would have to dial the allowed lateness back
down when the processing has caught up with the present.

Is there some intended way of handling reprocessing like this? Maybe not?
Perhaps it is more of a Pubsub and Dataflow question than a Beam question
when it comes down to it.


man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <j...@nanthrax.net>:

> OK, so the messages are "re-publish" on the topic, with the same timestamp
> as
> the original and consume again by the pipeline.
>
> Maybe, you can play with the allowed lateness and late firings ?
>
> Something like:
>
>            Window.into(FixedWindows.of(Duration.minutes(xx)))
>                .triggering(AfterWatermark.pastEndOfWindow()
>
>  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>                        .plusDelayOf(FIVE_MINUTES))
>
>  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
>                        .plusDelayOf(TEN_MINUTES)))
>                .withAllowedLateness(Duration.minutes()
>                .accumulatingFiredPanes())
>
> Thoughts ?
>
> Regards
> JB
>
> On 05/01/2017 05:12 PM, Lars BK wrote:
> > Hi Jean-Baptiste,
> >
> > I think the key point in my case is that I have to process or reprocess
> "old"
> > messages. That is, messages that are late because they are streamed from
> an
> > archive file and are older than the allowed lateness in the pipeline.
> >
> > In the case I described the messages had already been processed once and
> no
> > longer in the topic, so they had to be sent and processed again. But it
> might as
> > well have been that I had received a backfill of data that absolutely
> needs to
> > be processed regardless of it being later than the allowed lateness with
> respect
> > to present time.
> >
> > So when I write this now it really sounds like I either need to allow
> more
> > lateness or somehow rewind the watermark!
> >
> > Lars
> >
> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <j...@nanthrax.net
> > <mailto:j...@nanthrax.net>>:
> >
> >     Hi Lars,
> >
> >     interesting use case indeed ;)
> >
> >     Just to understand: if possible, you don't want to re-consume the
> messages from
> >     the PubSub topic right ? So, you want to "hold" the PCollections for
> late data
> >     processing ?
> >
> >     Regards
> >     JB
> >
> >     On 05/01/2017 04:15 PM, Lars BK wrote:
> >     > Hi,
> >     >
> >     > Is there a preferred way of approaching reprocessing historic data
> with
> >     > streaming jobs?
> >     >
> >     > I want to pose this as a general question, but I'm working with
> Pubsub and
> >     > Dataflow specifically. I am a fan of the idea of replaying/fast
> forwarding
> >     > through historic data to reproduce results (as you perhaps would
> with Kafka),
> >     > but I'm having a hard time unifying this way of thinking with the
> concepts of
> >     > watermarks and late data in Beam. I'm not sure how to best mimic
> this with the
> >     > tools I'm using, or if there is a better way.
> >     >
> >     > If there is a previous discussion about this I might have missed
> (and I'm
> >     > guessing there is), please direct me to it!
> >     >
> >     >
> >     > The use case:
> >     >
> >     > Suppose I discover a bug in a streaming job with event time
> windows and an
> >     > allowed lateness of 7 days, and that I subsequently have to
> reprocess all the
> >     > data for the past month. Let us also assume that I have an archive
> of my
> >     source
> >     > data (in my case in Google cloud storage) and that I can republish
> it all
> >     to the
> >     > message queue I'm using.
> >     >
> >     > Some ideas that may or may not work I would love to get your
> thoughts on:
> >     >
> >     > 1) Start a new instance of the job that reads from a separate
> source to
> >     which I
> >     > republish all messages. This shouldn't work because 14 days of my
> data is
> >     later
> >     > than the allowed limit, buy the remaining 7 days should be
> reprocessed as
> >     intended.
> >     >
> >     > 2) The same as 1), but with allowed lateness of one month. When
> the job is
> >     > caught up, the lateness can be adjusted back to 7 days. I am
> afraid this
> >     > approach may consume too much memory since I'm letting a whole
> month of
> >     windows
> >     > remain in memory. Also I wouldn't get the same triggering
> behaviour as in the
> >     > original job since most or all of the data is late with respect to
> the
> >     > watermark, which I assume is near real time when the historic data
> enters the
> >     > pipeline.
> >     >
> >     > 3) The same as 1), but with the republishing first and only
> starting the
> >     new job
> >     > when all messages are already waiting in the queue. The watermark
> should then
> >     > start one month back in time and only catch up with the present
> once all the
> >     > data is reprocessed, yielding no late data. (Experiments I've done
> with this
> >     > approach produce somewhat unexpected results where early panes
> that are older
> >     > than 7 days appear to be both the first and the last firing from
> their
> >     > respective windows.) Early firings triggered by processing time
> would probably
> >     > differ by the results should be the same? This approach also feels
> a bit
> >     awkward
> >     > as it requires more orchestration.
> >     >
> >     > 4) Batch process the archived data instead and start a streaming
> job in
> >     > parallel. Would this in a sense be a more honest approach since
> I'm actually
> >     > reprocessing batches of archived data? The triggering behaviour in
> the
> >     streaming
> >     > version of the job would not apply in batch, and I would want to
> avoid
> >     stitching
> >     > together results from two jobs if I can.
> >     >
> >     >
> >     > These are the approaches I've thought of currently, and any input
> is much
> >     > appreciated.  Have any of you faced similar situations, and how
> did you
> >     solve them?
> >     >
> >     >
> >     > Regards,
> >     > Lars
> >     >
> >     >
> >
> >     --
> >     Jean-Baptiste Onofré
> >     jbono...@apache.org <mailto:jbono...@apache.org>
> >     http://blog.nanthrax.net
> >     Talend - http://www.talend.com
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to