Re: Efficiency question

2017-05-01 Thread Robert Bradshaw
On Mon, May 1, 2017 at 10:46 AM,   wrote:
> Yes, I understand there's no explicit underlying time-ordering within the
> stream. What I am getting at is that the notion of windowing in Beam and
> Dataflow does rely on there being at least an implicit weak ordering within
> the  stream (without it, you could never issue a watermark - essentially the
> assumption of  "useful" watermarks can be treated as the definition of
> weak-ordering).

Yep.

> Often that weak ordering is in fact strong ordering in
> practice, yet we can't exploit it in that case.
>
> I am not sure maintaining a distributed total order would be more costly or
> necessary. For example you could distribute over key and then only require
> total order on a per-key basis.

Sometimes one has a total ordering (e.g. within a shard of a source)
but at each group-by-key one would have to re-establish this ordering
(as the upstream, formerly distributed elements were likely ordered
within different keys, on workers progressing at different rates).

Runners that can cheaply provide (or recognize) this property should
exploit it for their grouping. We don't have an API to expose this to
consumers of the GBK though.

> Anyway, thanks for the clarification - very helpful.

Anytime :)

> 1. May 2017 13:18 by rober...@google.com:
>
>
> On Mon, May 1, 2017 at 9:53 AM,  wrote:
>
>
> Thanks Thomas. That's good to know :) Are there any plans to support ordered
> sources?
>
>
> It's been discussed, but there are no concrete plans.
>
> It seems odd (to me at least) to have a stream-oriented
> computational model with support for grouping by time (windowing) and yet
> not provide hooks to exploit the same time-ordering within the stream.
>
>
> There is actually no underlying time-ordering within the stream. The
> elements are grouped by buffering elements as they come in and
> tracking a watermark (which is a signal that "all the data up to
> timestamp T has now been collected, see [1] for more details) to
> release completed groups downstream (depending on the triggering). A
> runner that actually guaranteed delivery time-ordered delivery of
> elements could provide this and group more efficiently, but that would
> likely impose a higher cost of maintaining a distributed total order.
>
> [1] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
>
> 1. May 2017 12:25 by tg...@google.com:
>
>
> Within the Beam model, there is no guarantee about the ordering of any
> PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
> element timestamps or any other comparator. Runners aren't required to
> maintain any ordering provided by a source, and do not require sources to
> provide any ordering. As such, if you want to process data in sorted order,
> currently the only option is to explicitly sort the data.
>
> On Mon, May 1, 2017 at 9:13 AM,  wrote:
>
>
> I have been trying to figure out the potential efficiency of sliding
> windows. Looking at the TrafficRoutes example -
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
> - it seems that the GatherStats class explicitly sorts its data (in
> event-time order) within every window for every key.
> (Collections.sort(infoList)).
>
> Is this necessary? If the data for each key arrives in event-time order
> and that order is maintained as the data flows through the pipeline, then
> the data within each window should already be sorted. For large sliding
> windows with small lags/sliding offsets re-sorting is going to be very
> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
> data stream is ordered, there are no guarantees to the ordering of the data
> after a window transform or GroupByKey has been applied?
>
> Thanks,
>
> Bill.
>
>


Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Ankur Chauhan
I have sort of a similar usecase when dealing with failed / cancelled / broken 
streaming pipelines.
We have an operator that continuously monitors the min-watermark of the 
pipeline and when it detects that the watermark is not advancing for more than 
some threshold. We start a new pipeline and initiate a "patcher" batch dataflow 
that reads the event backups over the possibly broken time range (+/- 1 hour).
It works out well but has the overhead of having to build out an external 
operator process that can detect when to do the batch dataflow process. 

Sent from my iPhone

> On May 1, 2017, at 09:37, Thomas Groh  wrote:
> 
> You should also be able to simply add a Bounded Read from the backup data 
> source to your pipeline and flatten it with your Pubsub topic. Because all of 
> the elements produced by both the bounded and unbounded sources will have 
> consistent timestamps, when you run the pipeline the watermark will be held 
> until all of the data is read from the bounded sources. Once this is done, 
> your pipeline can continue processing only elements from the PubSub source. 
> If you don't want the backlog and the current processing to occur in the same 
> pipeline, running the same pipeline but just reading from the archival data 
> should be sufficient (all of the processing would be identical, just the 
> source would need to change).
> 
> If you read from both the "live" and "archival" sources within the same 
> pipeline, you will need to use additional machines so the backlog can be 
> processed promptly if you use a watermark based trigger; watermarks will be 
> held until the bounded source is fully processed.
> 
>> On Mon, May 1, 2017 at 9:29 AM, Lars BK  wrote:
>> I did not see Lukasz reply before I posted, and I will have to read it a bit 
>> later!
>> 
>>> man. 1. mai 2017 kl. 18.28 skrev Lars BK :
>>> Yes, precisely. 
>>> 
>>> I think that could work, yes. What you are suggesting sounds like idea 2) 
>>> in my original question.
>>> 
>>> My main concern is that I would have to allow a great deal of lateness and 
>>> that old windows would consume too much memory. Whether it works in my case 
>>> or not I don't know yet as I haven't tested it. 
>>> 
>>> What if I had to process even older data? Could I handle any "oldness" of 
>>> data by increasing the allowed lateness and throwing machines at the 
>>> problem to hold all the old windows in memory while the backlog is 
>>> processed? If so, great! But I would have to dial the allowed lateness back 
>>> down when the processing has caught up with the present. 
>>> 
>>> Is there some intended way of handling reprocessing like this? Maybe not? 
>>> Perhaps it is more of a Pubsub and Dataflow question than a Beam question 
>>> when it comes down to it. 
>>> 
>>> 
 man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré :
 OK, so the messages are "re-publish" on the topic, with the same timestamp 
 as
 the original and consume again by the pipeline.
 
 Maybe, you can play with the allowed lateness and late firings ?
 
 Something like:
 
Window.into(FixedWindows.of(Duration.minutes(xx)))
.triggering(AfterWatermark.pastEndOfWindow()

 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))

 .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(Duration.minutes()
.accumulatingFiredPanes())
 
 Thoughts ?
 
 Regards
 JB
 
 On 05/01/2017 05:12 PM, Lars BK wrote:
 > Hi Jean-Baptiste,
 >
 > I think the key point in my case is that I have to process or reprocess 
 > "old"
 > messages. That is, messages that are late because they are streamed from 
 > an
 > archive file and are older than the allowed lateness in the pipeline.
 >
 > In the case I described the messages had already been processed once and 
 > no
 > longer in the topic, so they had to be sent and processed again. But it 
 > might as
 > well have been that I had received a backfill of data that absolutely 
 > needs to
 > be processed regardless of it being later than the allowed lateness with 
 > respect
 > to present time.
 >
 > So when I write this now it really sounds like I either need to allow 
 > more
 > lateness or somehow rewind the watermark!
 >
 > Lars
 >
 > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré  >:
 >
 > Hi Lars,
 >
 > interesting use case indeed ;)
 >
 > Just to understand: if possible, you don't want to re-consume the 
 > messages from
 > 

Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Thomas Groh
You should also be able to simply add a Bounded Read from the backup data
source to your pipeline and flatten it with your Pubsub topic. Because all
of the elements produced by both the bounded and unbounded sources will
have consistent timestamps, when you run the pipeline the watermark will be
held until all of the data is read from the bounded sources. Once this is
done, your pipeline can continue processing only elements from the PubSub
source. If you don't want the backlog and the current processing to occur
in the same pipeline, running the same pipeline but just reading from the
archival data should be sufficient (all of the processing would be
identical, just the source would need to change).

If you read from both the "live" and "archival" sources within the same
pipeline, you will need to use additional machines so the backlog can be
processed promptly if you use a watermark based trigger; watermarks will be
held until the bounded source is fully processed.

On Mon, May 1, 2017 at 9:29 AM, Lars BK  wrote:

> I did not see Lukasz reply before I posted, and I will have to read it a
> bit later!
>
> man. 1. mai 2017 kl. 18.28 skrev Lars BK :
>
>> Yes, precisely.
>>
>> I think that could work, yes. What you are suggesting sounds like idea 2)
>> in my original question.
>>
>> My main concern is that I would have to allow a great deal of lateness
>> and that old windows would consume too much memory. Whether it works in my
>> case or not I don't know yet as I haven't tested it.
>>
>> What if I had to process even older data? Could I handle any "oldness" of
>> data by increasing the allowed lateness and throwing machines at the
>> problem to hold all the old windows in memory while the backlog is
>> processed? If so, great! But I would have to dial the allowed lateness back
>> down when the processing has caught up with the present.
>>
>> Is there some intended way of handling reprocessing like this? Maybe not?
>> Perhaps it is more of a Pubsub and Dataflow question than a Beam question
>> when it comes down to it.
>>
>>
>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré :
>>
>>> OK, so the messages are "re-publish" on the topic, with the same
>>> timestamp as
>>> the original and consume again by the pipeline.
>>>
>>> Maybe, you can play with the allowed lateness and late firings ?
>>>
>>> Something like:
>>>
>>>Window.into(FixedWindows.of(Duration.minutes(xx)))
>>>.triggering(AfterWatermark.pastEndOfWindow()
>>>.withEarlyFirings(AfterProcessingTime.
>>> pastFirstElementInPane()
>>>.plusDelayOf(FIVE_MINUTES))
>>>.withLateFirings(AfterProcessingTime.
>>> pastFirstElementInPane()
>>>.plusDelayOf(TEN_MINUTES)))
>>>.withAllowedLateness(Duration.minutes()
>>>.accumulatingFiredPanes())
>>>
>>> Thoughts ?
>>>
>>> Regards
>>> JB
>>>
>>> On 05/01/2017 05:12 PM, Lars BK wrote:
>>> > Hi Jean-Baptiste,
>>> >
>>> > I think the key point in my case is that I have to process or
>>> reprocess "old"
>>> > messages. That is, messages that are late because they are streamed
>>> from an
>>> > archive file and are older than the allowed lateness in the pipeline.
>>> >
>>> > In the case I described the messages had already been processed once
>>> and no
>>> > longer in the topic, so they had to be sent and processed again. But
>>> it might as
>>> > well have been that I had received a backfill of data that absolutely
>>> needs to
>>> > be processed regardless of it being later than the allowed lateness
>>> with respect
>>> > to present time.
>>> >
>>> > So when I write this now it really sounds like I either need to allow
>>> more
>>> > lateness or somehow rewind the watermark!
>>> >
>>> > Lars
>>> >
>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré >> > >:
>>> >
>>> > Hi Lars,
>>> >
>>> > interesting use case indeed ;)
>>> >
>>> > Just to understand: if possible, you don't want to re-consume the
>>> messages from
>>> > the PubSub topic right ? So, you want to "hold" the PCollections
>>> for late data
>>> > processing ?
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 05/01/2017 04:15 PM, Lars BK wrote:
>>> > > Hi,
>>> > >
>>> > > Is there a preferred way of approaching reprocessing historic
>>> data with
>>> > > streaming jobs?
>>> > >
>>> > > I want to pose this as a general question, but I'm working with
>>> Pubsub and
>>> > > Dataflow specifically. I am a fan of the idea of replaying/fast
>>> forwarding
>>> > > through historic data to reproduce results (as you perhaps would
>>> with Kafka),
>>> > > but I'm having a hard time unifying this way of thinking with
>>> the concepts of
>>> > > watermarks and late data in Beam. I'm not sure how to best mimic
>>> this with the
>>> > > 

Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Lars BK
Yes, precisely.

I think that could work, yes. What you are suggesting sounds like idea 2)
in my original question.

My main concern is that I would have to allow a great deal of lateness and
that old windows would consume too much memory. Whether it works in my case
or not I don't know yet as I haven't tested it.

What if I had to process even older data? Could I handle any "oldness" of
data by increasing the allowed lateness and throwing machines at the
problem to hold all the old windows in memory while the backlog is
processed? If so, great! But I would have to dial the allowed lateness back
down when the processing has caught up with the present.

Is there some intended way of handling reprocessing like this? Maybe not?
Perhaps it is more of a Pubsub and Dataflow question than a Beam question
when it comes down to it.


man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré :

> OK, so the messages are "re-publish" on the topic, with the same timestamp
> as
> the original and consume again by the pipeline.
>
> Maybe, you can play with the allowed lateness and late firings ?
>
> Something like:
>
>Window.into(FixedWindows.of(Duration.minutes(xx)))
>.triggering(AfterWatermark.pastEndOfWindow()
>
>  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>.plusDelayOf(FIVE_MINUTES))
>
>  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
>.plusDelayOf(TEN_MINUTES)))
>.withAllowedLateness(Duration.minutes()
>.accumulatingFiredPanes())
>
> Thoughts ?
>
> Regards
> JB
>
> On 05/01/2017 05:12 PM, Lars BK wrote:
> > Hi Jean-Baptiste,
> >
> > I think the key point in my case is that I have to process or reprocess
> "old"
> > messages. That is, messages that are late because they are streamed from
> an
> > archive file and are older than the allowed lateness in the pipeline.
> >
> > In the case I described the messages had already been processed once and
> no
> > longer in the topic, so they had to be sent and processed again. But it
> might as
> > well have been that I had received a backfill of data that absolutely
> needs to
> > be processed regardless of it being later than the allowed lateness with
> respect
> > to present time.
> >
> > So when I write this now it really sounds like I either need to allow
> more
> > lateness or somehow rewind the watermark!
> >
> > Lars
> >
> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré  > >:
> >
> > Hi Lars,
> >
> > interesting use case indeed ;)
> >
> > Just to understand: if possible, you don't want to re-consume the
> messages from
> > the PubSub topic right ? So, you want to "hold" the PCollections for
> late data
> > processing ?
> >
> > Regards
> > JB
> >
> > On 05/01/2017 04:15 PM, Lars BK wrote:
> > > Hi,
> > >
> > > Is there a preferred way of approaching reprocessing historic data
> with
> > > streaming jobs?
> > >
> > > I want to pose this as a general question, but I'm working with
> Pubsub and
> > > Dataflow specifically. I am a fan of the idea of replaying/fast
> forwarding
> > > through historic data to reproduce results (as you perhaps would
> with Kafka),
> > > but I'm having a hard time unifying this way of thinking with the
> concepts of
> > > watermarks and late data in Beam. I'm not sure how to best mimic
> this with the
> > > tools I'm using, or if there is a better way.
> > >
> > > If there is a previous discussion about this I might have missed
> (and I'm
> > > guessing there is), please direct me to it!
> > >
> > >
> > > The use case:
> > >
> > > Suppose I discover a bug in a streaming job with event time
> windows and an
> > > allowed lateness of 7 days, and that I subsequently have to
> reprocess all the
> > > data for the past month. Let us also assume that I have an archive
> of my
> > source
> > > data (in my case in Google cloud storage) and that I can republish
> it all
> > to the
> > > message queue I'm using.
> > >
> > > Some ideas that may or may not work I would love to get your
> thoughts on:
> > >
> > > 1) Start a new instance of the job that reads from a separate
> source to
> > which I
> > > republish all messages. This shouldn't work because 14 days of my
> data is
> > later
> > > than the allowed limit, buy the remaining 7 days should be
> reprocessed as
> > intended.
> > >
> > > 2) The same as 1), but with allowed lateness of one month. When
> the job is
> > > caught up, the lateness can be adjusted back to 7 days. I am
> afraid this
> > > approach may consume too much memory since I'm letting a whole
> month of
> > windows
> > > remain in memory. Also I wouldn't get the same triggering
> behaviour as in the
> > > original job since most 

Re: Efficiency question

2017-05-01 Thread Thomas Groh
Within the Beam model, there is no guarantee about the ordering of any
PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
element timestamps or any other comparator. Runners aren't required to
maintain any ordering provided by a source, and do not require sources to
provide any ordering. As such, if you want to process data in sorted order,
currently the only option is to explicitly sort the data.

On Mon, May 1, 2017 at 9:13 AM,  wrote:

> I have been trying to figure out the potential efficiency of sliding
> windows. Looking at the TrafficRoutes example - https://github.com/
> GoogleCloudPlatform/DataflowJavaSDK-examples/blob/
> master/src/main/java/com/google/cloud/dataflow/examples/complete/
> TrafficRoutes.java -  it seems that the GatherStats class explicitly
> sorts its data (in event-time order) within every window for every key.
> (Collections.sort(infoList)).
>
> Is this necessary? If the data for each key arrives in event-time order
> and that order is maintained as the data flows through the pipeline, then
> the data within each window should already be sorted. For large sliding
> windows with small lags/sliding offsets re-sorting is going to be very
> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
> data stream is ordered, there are no guarantees to the ordering of the data
> after a window transform or GroupByKey has been applied?
>
> Thanks,
>
> Bill.
>


Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Lukasz Cwik
I believe that if your data from the past can't effect the data of the
future because the windows/state are independent of each other then just
reprocessing the old data using a batch job is simplest and likely to be
the fastest.

About your choices 1, 2, and 3, allowed lateness is relative to the
watermark of the source and not to the "current" time so having an
independent source which has data which is from a month ago will be fine.
As the records are processed the watermark will advance and eventually
catch up. You want to ensure that the records read from the source with the
republished events are somewhat ordered so that you don't read really
recent records which pushes the watermark forward really fast forcing other
older records (beyond allowed lateness relative to the watermark) to be
dropped.

On Mon, May 1, 2017 at 8:25 AM, Jean-Baptiste Onofré 
wrote:

> OK, so the messages are "re-publish" on the topic, with the same timestamp
> as the original and consume again by the pipeline.
>
> Maybe, you can play with the allowed lateness and late firings ?
>
> Something like:
>
>   Window.into(FixedWindows.of(Duration.minutes(xx)))
>   .triggering(AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcess
> ingTime.pastFirstElementInPane()
>   .plusDelayOf(FIVE_MINUTES))
>   .withLateFirings(AfterProcessi
> ngTime.pastFirstElementInPane()
>   .plusDelayOf(TEN_MINUTES)))
>   .withAllowedLateness(Duration.minutes()
>   .accumulatingFiredPanes())
>
> Thoughts ?
>
> Regards
> JB
>
> On 05/01/2017 05:12 PM, Lars BK wrote:
>
>> Hi Jean-Baptiste,
>>
>> I think the key point in my case is that I have to process or reprocess
>> "old"
>> messages. That is, messages that are late because they are streamed from
>> an
>> archive file and are older than the allowed lateness in the pipeline.
>>
>> In the case I described the messages had already been processed once and
>> no
>> longer in the topic, so they had to be sent and processed again. But it
>> might as
>> well have been that I had received a backfill of data that absolutely
>> needs to
>> be processed regardless of it being later than the allowed lateness with
>> respect
>> to present time.
>>
>> So when I write this now it really sounds like I either need to allow more
>> lateness or somehow rewind the watermark!
>>
>> Lars
>>
>> man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré > >:
>>
>>
>> Hi Lars,
>>
>> interesting use case indeed ;)
>>
>> Just to understand: if possible, you don't want to re-consume the
>> messages from
>> the PubSub topic right ? So, you want to "hold" the PCollections for
>> late data
>> processing ?
>>
>> Regards
>> JB
>>
>> On 05/01/2017 04:15 PM, Lars BK wrote:
>> > Hi,
>> >
>> > Is there a preferred way of approaching reprocessing historic data
>> with
>> > streaming jobs?
>> >
>> > I want to pose this as a general question, but I'm working with
>> Pubsub and
>> > Dataflow specifically. I am a fan of the idea of replaying/fast
>> forwarding
>> > through historic data to reproduce results (as you perhaps would
>> with Kafka),
>> > but I'm having a hard time unifying this way of thinking with the
>> concepts of
>> > watermarks and late data in Beam. I'm not sure how to best mimic
>> this with the
>> > tools I'm using, or if there is a better way.
>> >
>> > If there is a previous discussion about this I might have missed
>> (and I'm
>> > guessing there is), please direct me to it!
>> >
>> >
>> > The use case:
>> >
>> > Suppose I discover a bug in a streaming job with event time windows
>> and an
>> > allowed lateness of 7 days, and that I subsequently have to
>> reprocess all the
>> > data for the past month. Let us also assume that I have an archive
>> of my
>> source
>> > data (in my case in Google cloud storage) and that I can republish
>> it all
>> to the
>> > message queue I'm using.
>> >
>> > Some ideas that may or may not work I would love to get your
>> thoughts on:
>> >
>> > 1) Start a new instance of the job that reads from a separate
>> source to
>> which I
>> > republish all messages. This shouldn't work because 14 days of my
>> data is
>> later
>> > than the allowed limit, buy the remaining 7 days should be
>> reprocessed as
>> intended.
>> >
>> > 2) The same as 1), but with allowed lateness of one month. When the
>> job is
>> > caught up, the lateness can be adjusted back to 7 days. I am afraid
>> this
>> > approach may consume too much memory since I'm letting a whole
>> month of
>> windows
>> > remain in memory. Also I wouldn't get the same triggering behaviour
>> as in the
>> > original job since most or all of the data is late with 

Efficiency question

2017-05-01 Thread billsmith31415
I have been trying to figure out the potential efficiency of sliding windows. 
Looking at the TrafficRoutes example - 
https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
 -  it seems that the GatherStats class explicitly sorts its data (in 
event-time order) within every window for every key. 
(Collections.sort(infoList)). 
Is this necessary? If the data for each key arrives in event-time order and 
that order is maintained as the data flows through the pipeline, then the data 
within each window should already be sorted. For large sliding windows with 
small lags/sliding offsets re-sorting is going to be very inefficient. Or is it 
the case in Beam/DataFlow that even if the underlying data stream is ordered, 
there are no guarantees to the ordering of the data after a window transform or 
GroupByKey has been applied? 
Thanks,
Bill.

Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Jean-Baptiste Onofré
OK, so the messages are "re-publish" on the topic, with the same timestamp as 
the original and consume again by the pipeline.


Maybe, you can play with the allowed lateness and late firings ?

Something like:

  Window.into(FixedWindows.of(Duration.minutes(xx)))
  .triggering(AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
  .plusDelayOf(FIVE_MINUTES))
  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
  .plusDelayOf(TEN_MINUTES)))
  .withAllowedLateness(Duration.minutes()
  .accumulatingFiredPanes())

Thoughts ?

Regards
JB

On 05/01/2017 05:12 PM, Lars BK wrote:

Hi Jean-Baptiste,

I think the key point in my case is that I have to process or reprocess "old"
messages. That is, messages that are late because they are streamed from an
archive file and are older than the allowed lateness in the pipeline.

In the case I described the messages had already been processed once and no
longer in the topic, so they had to be sent and processed again. But it might as
well have been that I had received a backfill of data that absolutely needs to
be processed regardless of it being later than the allowed lateness with respect
to present time.

So when I write this now it really sounds like I either need to allow more
lateness or somehow rewind the watermark!

Lars

man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré >:

Hi Lars,

interesting use case indeed ;)

Just to understand: if possible, you don't want to re-consume the messages 
from
the PubSub topic right ? So, you want to "hold" the PCollections for late 
data
processing ?

Regards
JB

On 05/01/2017 04:15 PM, Lars BK wrote:
> Hi,
>
> Is there a preferred way of approaching reprocessing historic data with
> streaming jobs?
>
> I want to pose this as a general question, but I'm working with Pubsub and
> Dataflow specifically. I am a fan of the idea of replaying/fast forwarding
> through historic data to reproduce results (as you perhaps would with 
Kafka),
> but I'm having a hard time unifying this way of thinking with the 
concepts of
> watermarks and late data in Beam. I'm not sure how to best mimic this 
with the
> tools I'm using, or if there is a better way.
>
> If there is a previous discussion about this I might have missed (and I'm
> guessing there is), please direct me to it!
>
>
> The use case:
>
> Suppose I discover a bug in a streaming job with event time windows and an
> allowed lateness of 7 days, and that I subsequently have to reprocess all 
the
> data for the past month. Let us also assume that I have an archive of my
source
> data (in my case in Google cloud storage) and that I can republish it all
to the
> message queue I'm using.
>
> Some ideas that may or may not work I would love to get your thoughts on:
>
> 1) Start a new instance of the job that reads from a separate source to
which I
> republish all messages. This shouldn't work because 14 days of my data is
later
> than the allowed limit, buy the remaining 7 days should be reprocessed as
intended.
>
> 2) The same as 1), but with allowed lateness of one month. When the job is
> caught up, the lateness can be adjusted back to 7 days. I am afraid this
> approach may consume too much memory since I'm letting a whole month of
windows
> remain in memory. Also I wouldn't get the same triggering behaviour as in 
the
> original job since most or all of the data is late with respect to the
> watermark, which I assume is near real time when the historic data enters 
the
> pipeline.
>
> 3) The same as 1), but with the republishing first and only starting the
new job
> when all messages are already waiting in the queue. The watermark should 
then
> start one month back in time and only catch up with the present once all 
the
> data is reprocessed, yielding no late data. (Experiments I've done with 
this
> approach produce somewhat unexpected results where early panes that are 
older
> than 7 days appear to be both the first and the last firing from their
> respective windows.) Early firings triggered by processing time would 
probably
> differ by the results should be the same? This approach also feels a bit
awkward
> as it requires more orchestration.
>
> 4) Batch process the archived data instead and start a streaming job in
> parallel. Would this in a sense be a more honest approach since I'm 
actually
> reprocessing batches of archived data? The triggering behaviour in the
streaming
> version of the job would not apply in batch, and I would want to avoid
stitching
> together results from 

Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Lars BK
Hi Jean-Baptiste,

I think the key point in my case is that I have to process or reprocess
"old" messages. That is, messages that are late because they are streamed
from an archive file and are older than the allowed lateness in the
pipeline.

In the case I described the messages had already been processed once and no
longer in the topic, so they had to be sent and processed again. But it
might as well have been that I had received a backfill of data that
absolutely needs to be processed regardless of it being later than the
allowed lateness with respect to present time.

So when I write this now it really sounds like I either need to allow more
lateness or somehow rewind the watermark!

Lars

man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré :

> Hi Lars,
>
> interesting use case indeed ;)
>
> Just to understand: if possible, you don't want to re-consume the messages
> from
> the PubSub topic right ? So, you want to "hold" the PCollections for late
> data
> processing ?
>
> Regards
> JB
>
> On 05/01/2017 04:15 PM, Lars BK wrote:
> > Hi,
> >
> > Is there a preferred way of approaching reprocessing historic data with
> > streaming jobs?
> >
> > I want to pose this as a general question, but I'm working with Pubsub
> and
> > Dataflow specifically. I am a fan of the idea of replaying/fast
> forwarding
> > through historic data to reproduce results (as you perhaps would with
> Kafka),
> > but I'm having a hard time unifying this way of thinking with the
> concepts of
> > watermarks and late data in Beam. I'm not sure how to best mimic this
> with the
> > tools I'm using, or if there is a better way.
> >
> > If there is a previous discussion about this I might have missed (and I'm
> > guessing there is), please direct me to it!
> >
> >
> > The use case:
> >
> > Suppose I discover a bug in a streaming job with event time windows and
> an
> > allowed lateness of 7 days, and that I subsequently have to reprocess
> all the
> > data for the past month. Let us also assume that I have an archive of my
> source
> > data (in my case in Google cloud storage) and that I can republish it
> all to the
> > message queue I'm using.
> >
> > Some ideas that may or may not work I would love to get your thoughts on:
> >
> > 1) Start a new instance of the job that reads from a separate source to
> which I
> > republish all messages. This shouldn't work because 14 days of my data
> is later
> > than the allowed limit, buy the remaining 7 days should be reprocessed
> as intended.
> >
> > 2) The same as 1), but with allowed lateness of one month. When the job
> is
> > caught up, the lateness can be adjusted back to 7 days. I am afraid this
> > approach may consume too much memory since I'm letting a whole month of
> windows
> > remain in memory. Also I wouldn't get the same triggering behaviour as
> in the
> > original job since most or all of the data is late with respect to the
> > watermark, which I assume is near real time when the historic data
> enters the
> > pipeline.
> >
> > 3) The same as 1), but with the republishing first and only starting the
> new job
> > when all messages are already waiting in the queue. The watermark should
> then
> > start one month back in time and only catch up with the present once all
> the
> > data is reprocessed, yielding no late data. (Experiments I've done with
> this
> > approach produce somewhat unexpected results where early panes that are
> older
> > than 7 days appear to be both the first and the last firing from their
> > respective windows.) Early firings triggered by processing time would
> probably
> > differ by the results should be the same? This approach also feels a bit
> awkward
> > as it requires more orchestration.
> >
> > 4) Batch process the archived data instead and start a streaming job in
> > parallel. Would this in a sense be a more honest approach since I'm
> actually
> > reprocessing batches of archived data? The triggering behaviour in the
> streaming
> > version of the job would not apply in batch, and I would want to avoid
> stitching
> > together results from two jobs if I can.
> >
> >
> > These are the approaches I've thought of currently, and any input is much
> > appreciated.  Have any of you faced similar situations, and how did you
> solve them?
> >
> >
> > Regards,
> > Lars
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Reprocessing historic data with streaming jobs

2017-05-01 Thread Jean-Baptiste Onofré

Hi Lars,

interesting use case indeed ;)

Just to understand: if possible, you don't want to re-consume the messages from 
the PubSub topic right ? So, you want to "hold" the PCollections for late data 
processing ?


Regards
JB

On 05/01/2017 04:15 PM, Lars BK wrote:

Hi,

Is there a preferred way of approaching reprocessing historic data with
streaming jobs?

I want to pose this as a general question, but I'm working with Pubsub and
Dataflow specifically. I am a fan of the idea of replaying/fast forwarding
through historic data to reproduce results (as you perhaps would with Kafka),
but I'm having a hard time unifying this way of thinking with the concepts of
watermarks and late data in Beam. I'm not sure how to best mimic this with the
tools I'm using, or if there is a better way.

If there is a previous discussion about this I might have missed (and I'm
guessing there is), please direct me to it!


The use case:

Suppose I discover a bug in a streaming job with event time windows and an
allowed lateness of 7 days, and that I subsequently have to reprocess all the
data for the past month. Let us also assume that I have an archive of my source
data (in my case in Google cloud storage) and that I can republish it all to the
message queue I'm using.

Some ideas that may or may not work I would love to get your thoughts on:

1) Start a new instance of the job that reads from a separate source to which I
republish all messages. This shouldn't work because 14 days of my data is later
than the allowed limit, buy the remaining 7 days should be reprocessed as 
intended.

2) The same as 1), but with allowed lateness of one month. When the job is
caught up, the lateness can be adjusted back to 7 days. I am afraid this
approach may consume too much memory since I'm letting a whole month of windows
remain in memory. Also I wouldn't get the same triggering behaviour as in the
original job since most or all of the data is late with respect to the
watermark, which I assume is near real time when the historic data enters the
pipeline.

3) The same as 1), but with the republishing first and only starting the new job
when all messages are already waiting in the queue. The watermark should then
start one month back in time and only catch up with the present once all the
data is reprocessed, yielding no late data. (Experiments I've done with this
approach produce somewhat unexpected results where early panes that are older
than 7 days appear to be both the first and the last firing from their
respective windows.) Early firings triggered by processing time would probably
differ by the results should be the same? This approach also feels a bit awkward
as it requires more orchestration.

4) Batch process the archived data instead and start a streaming job in
parallel. Would this in a sense be a more honest approach since I'm actually
reprocessing batches of archived data? The triggering behaviour in the streaming
version of the job would not apply in batch, and I would want to avoid stitching
together results from two jobs if I can.


These are the approaches I've thought of currently, and any input is much
appreciated.  Have any of you faced similar situations, and how did you solve 
them?


Regards,
Lars




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reprocessing historic data with streaming jobs

2017-05-01 Thread Lars BK
Hi,

Is there a preferred way of approaching reprocessing historic data with
streaming jobs?

I want to pose this as a general question, but I'm working with Pubsub and
Dataflow specifically. I am a fan of the idea of replaying/fast forwarding
through historic data to reproduce results (as you perhaps would with
Kafka), but I'm having a hard time unifying this way of thinking with the
concepts of watermarks and late data in Beam. I'm not sure how to best
mimic this with the tools I'm using, or if there is a better way.

If there is a previous discussion about this I might have missed (and I'm
guessing there is), please direct me to it!


The use case:

Suppose I discover a bug in a streaming job with event time windows and an
allowed lateness of 7 days, and that I subsequently have to reprocess all
the data for the past month. Let us also assume that I have an archive of
my source data (in my case in Google cloud storage) and that I can
republish it all to the message queue I'm using.

Some ideas that may or may not work I would love to get your thoughts on:

1) Start a new instance of the job that reads from a separate source to
which I republish all messages. This shouldn't work because 14 days of my
data is later than the allowed limit, buy the remaining 7 days should be
reprocessed as intended.

2) The same as 1), but with allowed lateness of one month. When the job is
caught up, the lateness can be adjusted back to 7 days. I am afraid this
approach may consume too much memory since I'm letting a whole month of
windows remain in memory. Also I wouldn't get the same triggering behaviour
as in the original job since most or all of the data is late with respect
to the watermark, which I assume is near real time when the historic data
enters the pipeline.

3) The same as 1), but with the republishing first and only starting the
new job when all messages are already waiting in the queue. The watermark
should then start one month back in time and only catch up with the present
once all the data is reprocessed, yielding no late data. (Experiments I've
done with this approach produce somewhat unexpected results where early
panes that are older than 7 days appear to be both the first and the last
firing from their respective windows.) Early firings triggered by
processing time would probably differ by the results should be the same?
This approach also feels a bit awkward as it requires more orchestration.

4) Batch process the archived data instead and start a streaming job in
parallel. Would this in a sense be a more honest approach since I'm
actually reprocessing batches of archived data? The triggering behaviour in
the streaming version of the job would not apply in batch, and I would want
to avoid stitching together results from two jobs if I can.


These are the approaches I've thought of currently, and any input is much
appreciated.  Have any of you faced similar situations, and how did you
solve them?


Regards,
Lars