OK, so the messages are "re-publish" on the topic, with the same timestamp as
the original and consume again by the pipeline.
Maybe, you can play with the allowed lateness and late firings ?
Something like:
Window.into(FixedWindows.of(Duration.minutes(xx)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(Duration.minutes()
.accumulatingFiredPanes())
Thoughts ?
Regards
JB
On 05/01/2017 05:12 PM, Lars BK wrote:
Hi Jean-Baptiste,
I think the key point in my case is that I have to process or reprocess "old"
messages. That is, messages that are late because they are streamed from an
archive file and are older than the allowed lateness in the pipeline.
In the case I described the messages had already been processed once and no
longer in the topic, so they had to be sent and processed again. But it might as
well have been that I had received a backfill of data that absolutely needs to
be processed regardless of it being later than the allowed lateness with respect
to present time.
So when I write this now it really sounds like I either need to allow more
lateness or somehow rewind the watermark!
Lars
man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <[email protected]
<mailto:[email protected]>>:
Hi Lars,
interesting use case indeed ;)
Just to understand: if possible, you don't want to re-consume the messages
from
the PubSub topic right ? So, you want to "hold" the PCollections for late
data
processing ?
Regards
JB
On 05/01/2017 04:15 PM, Lars BK wrote:
> Hi,
>
> Is there a preferred way of approaching reprocessing historic data with
> streaming jobs?
>
> I want to pose this as a general question, but I'm working with Pubsub and
> Dataflow specifically. I am a fan of the idea of replaying/fast forwarding
> through historic data to reproduce results (as you perhaps would with
Kafka),
> but I'm having a hard time unifying this way of thinking with the
concepts of
> watermarks and late data in Beam. I'm not sure how to best mimic this
with the
> tools I'm using, or if there is a better way.
>
> If there is a previous discussion about this I might have missed (and I'm
> guessing there is), please direct me to it!
>
>
> The use case:
>
> Suppose I discover a bug in a streaming job with event time windows and an
> allowed lateness of 7 days, and that I subsequently have to reprocess all
the
> data for the past month. Let us also assume that I have an archive of my
source
> data (in my case in Google cloud storage) and that I can republish it all
to the
> message queue I'm using.
>
> Some ideas that may or may not work I would love to get your thoughts on:
>
> 1) Start a new instance of the job that reads from a separate source to
which I
> republish all messages. This shouldn't work because 14 days of my data is
later
> than the allowed limit, buy the remaining 7 days should be reprocessed as
intended.
>
> 2) The same as 1), but with allowed lateness of one month. When the job is
> caught up, the lateness can be adjusted back to 7 days. I am afraid this
> approach may consume too much memory since I'm letting a whole month of
windows
> remain in memory. Also I wouldn't get the same triggering behaviour as in
the
> original job since most or all of the data is late with respect to the
> watermark, which I assume is near real time when the historic data enters
the
> pipeline.
>
> 3) The same as 1), but with the republishing first and only starting the
new job
> when all messages are already waiting in the queue. The watermark should
then
> start one month back in time and only catch up with the present once all
the
> data is reprocessed, yielding no late data. (Experiments I've done with
this
> approach produce somewhat unexpected results where early panes that are
older
> than 7 days appear to be both the first and the last firing from their
> respective windows.) Early firings triggered by processing time would
probably
> differ by the results should be the same? This approach also feels a bit
awkward
> as it requires more orchestration.
>
> 4) Batch process the archived data instead and start a streaming job in
> parallel. Would this in a sense be a more honest approach since I'm
actually
> reprocessing batches of archived data? The triggering behaviour in the
streaming
> version of the job would not apply in batch, and I would want to avoid
stitching
> together results from two jobs if I can.
>
>
> These are the approaches I've thought of currently, and any input is much
> appreciated. Have any of you faced similar situations, and how did you
solve them?
>
>
> Regards,
> Lars
>
>
--
Jean-Baptiste Onofré
[email protected] <mailto:[email protected]>
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com