OK, so the messages are "re-publish" on the topic, with the same timestamp as the original and consume again by the pipeline.

Maybe, you can play with the allowed lateness and late firings ?

Something like:

          Window.into(FixedWindows.of(Duration.minutes(xx)))
              .triggering(AfterWatermark.pastEndOfWindow()
                  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                      .plusDelayOf(FIVE_MINUTES))
                  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                      .plusDelayOf(TEN_MINUTES)))
              .withAllowedLateness(Duration.minutes()
              .accumulatingFiredPanes())

Thoughts ?

Regards
JB

On 05/01/2017 05:12 PM, Lars BK wrote:
Hi Jean-Baptiste,

I think the key point in my case is that I have to process or reprocess "old"
messages. That is, messages that are late because they are streamed from an
archive file and are older than the allowed lateness in the pipeline.

In the case I described the messages had already been processed once and no
longer in the topic, so they had to be sent and processed again. But it might as
well have been that I had received a backfill of data that absolutely needs to
be processed regardless of it being later than the allowed lateness with respect
to present time.

So when I write this now it really sounds like I either need to allow more
lateness or somehow rewind the watermark!

Lars

man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <[email protected]
<mailto:[email protected]>>:

    Hi Lars,

    interesting use case indeed ;)

    Just to understand: if possible, you don't want to re-consume the messages 
from
    the PubSub topic right ? So, you want to "hold" the PCollections for late 
data
    processing ?

    Regards
    JB

    On 05/01/2017 04:15 PM, Lars BK wrote:
    > Hi,
    >
    > Is there a preferred way of approaching reprocessing historic data with
    > streaming jobs?
    >
    > I want to pose this as a general question, but I'm working with Pubsub and
    > Dataflow specifically. I am a fan of the idea of replaying/fast forwarding
    > through historic data to reproduce results (as you perhaps would with 
Kafka),
    > but I'm having a hard time unifying this way of thinking with the 
concepts of
    > watermarks and late data in Beam. I'm not sure how to best mimic this 
with the
    > tools I'm using, or if there is a better way.
    >
    > If there is a previous discussion about this I might have missed (and I'm
    > guessing there is), please direct me to it!
    >
    >
    > The use case:
    >
    > Suppose I discover a bug in a streaming job with event time windows and an
    > allowed lateness of 7 days, and that I subsequently have to reprocess all 
the
    > data for the past month. Let us also assume that I have an archive of my
    source
    > data (in my case in Google cloud storage) and that I can republish it all
    to the
    > message queue I'm using.
    >
    > Some ideas that may or may not work I would love to get your thoughts on:
    >
    > 1) Start a new instance of the job that reads from a separate source to
    which I
    > republish all messages. This shouldn't work because 14 days of my data is
    later
    > than the allowed limit, buy the remaining 7 days should be reprocessed as
    intended.
    >
    > 2) The same as 1), but with allowed lateness of one month. When the job is
    > caught up, the lateness can be adjusted back to 7 days. I am afraid this
    > approach may consume too much memory since I'm letting a whole month of
    windows
    > remain in memory. Also I wouldn't get the same triggering behaviour as in 
the
    > original job since most or all of the data is late with respect to the
    > watermark, which I assume is near real time when the historic data enters 
the
    > pipeline.
    >
    > 3) The same as 1), but with the republishing first and only starting the
    new job
    > when all messages are already waiting in the queue. The watermark should 
then
    > start one month back in time and only catch up with the present once all 
the
    > data is reprocessed, yielding no late data. (Experiments I've done with 
this
    > approach produce somewhat unexpected results where early panes that are 
older
    > than 7 days appear to be both the first and the last firing from their
    > respective windows.) Early firings triggered by processing time would 
probably
    > differ by the results should be the same? This approach also feels a bit
    awkward
    > as it requires more orchestration.
    >
    > 4) Batch process the archived data instead and start a streaming job in
    > parallel. Would this in a sense be a more honest approach since I'm 
actually
    > reprocessing batches of archived data? The triggering behaviour in the
    streaming
    > version of the job would not apply in batch, and I would want to avoid
    stitching
    > together results from two jobs if I can.
    >
    >
    > These are the approaches I've thought of currently, and any input is much
    > appreciated.  Have any of you faced similar situations, and how did you
    solve them?
    >
    >
    > Regards,
    > Lars
    >
    >

    --
    Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net
    Talend - http://www.talend.com


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to