Re: Looping timer blog

2019-06-27 Thread Jan Lukavský

Hi Reza,

cool, I have put together a PR [1], which is still not completely ready. 
There are least still missing some tests - probably @ValidatesRunner and 
then fixing runners that won't pass that. It also misses few features 
described in the design doc, but that could be probably fixed later 
(support for allowedLateness and user-supplied sorting criterion). Would 
you like to test this on some of your code? It might suffice to put 
@RequiresTimeSortedInput on @ProcessElement method and input should 
start being sorted (should work at least for DirectRunner, FlinkRunner 
(stream and batch) and SparkRunner (batch)).


[1] https://github.com/apache/beam/pull/8774

On 6/27/19 6:16 AM, Reza Rokni wrote:



On Tue, 25 Jun 2019 at 21:20, Jan Lukavský > wrote:



On 6/25/19 1:43 PM, Reza Rokni wrote:



On Tue, 25 Jun 2019 at 18:12, Jan Lukavský mailto:je...@seznam.cz>> wrote:

> The TTL check would be in the same Timer rather than a
separate Timer.  The max value processed in each OnTimer call
would be stored in valuestate and used as base to know how
long it has been seen the pipeline has seen an external value
for that key.

OK, that seems to work, if you store maximum timestamp in a
value state (that is, basically you generate per-key watermark).

> You can store it in ValueState rather than BagState, but
yes you store that value in State ready for the next
OnTimer() fire.

In my understanding of the problem, I'd say that this
approach seems a little suboptimal. Consider the following,
when trying to generate the OHLC data (open, high, low,
close, that is move last closing price to next window opening
price)

 - suppose we have three times T1 < T2 < T3 < T4, where times
T2 and T4 denote "end of windows" (closing times)

 - first (in processing time), we receive value for time T3,
we cache it in ValueState, we set timer for T4

 - next, we receive value for T1 - but we cannot overwrite
the value already written for T3, right? What to do then? And
will we reset timer to time T3?

 - basically, because we received *two* values, both of which
are needed and no timer could have been fired in between, we
need both values stored to know which value to emit when
timer fires. And consider that on batch, the timer fires only
after we see all data (without any ordering).

I assume you are referring to late data rather than out of order
data ( the later being taken care of with the in-memory sort). As
discussed in the talk late data is a sharp edge, one solution for
late data is to branch it away before GlobalWindow + State DoFn.
This can then be output from the pipeline into a sink with a
marker to initiate a manual procedure for correction. Essentially
a manual redaction.


Which in-memory sort do you refer to? I'm pretty sure there must
be sorting involved for this to work, but I'm not quite sure where
exactly it is in your implementation. You said that you can put
data in ValueState rather than BagState, so do you have a List as
a value in the ValueState? Or do you wrap the stateful par do with
some other sorting logic? And if so, how does this work on batch?
I suppose that it has to all fit to memory. I think this all goes
around the @RequiresTimeSortedInput annotation, that I propose.
Maybe we can cooperate on that? :)\

Hu... nice this chat made me notice a bug in the looping timer 
example code we missed thanx :-) , the ValueState 
timerRunning, should actually be a ValueState minTimestamp and 
the check to set the timer needs to be if (NULL or  element.Timestamp 
is < then timer.Timestamp). Which also requires the use of timer read 
pattern as we don't have timer.read() 
https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542. 
I will fix and put a PR to correct the blog.


For the hold and propagate pattern (for those following the original 
thread the pattern is not covered in the blog example code, but 
discussed at the summit):

OnProcess()
- You can drop the accumulators into BagState.
- A timer is set at the minimum time interval.
OnTimer()
- The list is sorted in memory, for a lot of timeseries use cases (for 
example ohlc) the memory issues are heavily mitigated as we can use a 
Fixed Windows partial aggregations before the GlobalWindow stage. 
(Partial because they dont have the correct Open value set until they 
flow into the Global Window). Of course how big the window is dictates 
the compression you would get.

- The current timer is set again to fire in the next interval window.

@RequiresTimeSortedInput looks super interesting, happy to help out. 
Although its a harder problem then the targeted timeseries use cases 
where 

Re: Looping timer blog

2019-06-26 Thread Reza Rokni
On Tue, 25 Jun 2019 at 21:20, Jan Lukavský  wrote:

>
> On 6/25/19 1:43 PM, Reza Rokni wrote:
>
>
>
> On Tue, 25 Jun 2019 at 18:12, Jan Lukavský  wrote:
>
>> > The TTL check would be in the same Timer rather than a separate Timer.
>> The max value processed in each OnTimer call would be stored in valuestate
>> and used as base to know how long it has been seen the pipeline has seen an
>> external value for that key.
>>
>> OK, that seems to work, if you store maximum timestamp in a value state
>> (that is, basically you generate per-key watermark).
>>
>> > You can store it in ValueState rather than BagState, but yes you store
>> that value in State ready for the next OnTimer() fire.
>>
>> In my understanding of the problem, I'd say that this approach seems a
>> little suboptimal. Consider the following, when trying to generate the OHLC
>> data (open, high, low, close, that is move last closing price to next
>> window opening price)
>>
>>  - suppose we have three times T1 < T2 < T3 < T4, where times T2 and T4
>> denote "end of windows" (closing times)
>>
>>  - first (in processing time), we receive value for time T3, we cache it
>> in ValueState, we set timer for T4
>>
>>  - next, we receive value for T1 - but we cannot overwrite the value
>> already written for T3, right? What to do then? And will we reset timer to
>> time T3?
>>
>>  - basically, because we received *two* values, both of which are needed
>> and no timer could have been fired in between, we need both values stored
>> to know which value to emit when timer fires. And consider that on batch,
>> the timer fires only after we see all data (without any ordering).
>>
> I assume you are referring to late data rather than out of order data (
> the later being taken care of with the in-memory sort). As discussed in the
> talk late data is a sharp edge, one solution for late data is to branch it
> away before GlobalWindow + State DoFn. This can then be output from the
> pipeline into a sink with a marker to initiate a manual procedure for
> correction. Essentially a manual redaction.
>
> Which in-memory sort do you refer to? I'm pretty sure there must be
> sorting involved for this to work, but I'm not quite sure where exactly it
> is in your implementation. You said that you can put data in ValueState
> rather than BagState, so do you have a List as a value in the ValueState?
> Or do you wrap the stateful par do with some other sorting logic? And if
> so, how does this work on batch? I suppose that it has to all fit to
> memory. I think this all goes around the @RequiresTimeSortedInput
> annotation, that I propose. Maybe we can cooperate on that? :)\
>
Hu... nice this chat made me notice a bug in the looping timer example
code we missed thanx :-) , the ValueState timerRunning, should
actually be a ValueState minTimestamp and the check to set the timer
needs to be if (NULL or  element.Timestamp is < then timer.Timestamp).
Which also requires the use of timer read pattern as we don't have
timer.read()
https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542.
I will fix and put a PR to correct the blog.

For the hold and propagate pattern (for those following the original thread
the pattern is not covered in the blog example code, but discussed at the
summit):
OnProcess()
- You can drop the accumulators into BagState.
- A timer is set at the minimum time interval.
OnTimer()
- The list is sorted in memory, for a lot of timeseries use cases (for
example ohlc) the memory issues are heavily mitigated as we can use a Fixed
Windows partial aggregations before the GlobalWindow stage. (Partial
because they dont have the correct Open value set until they flow into the
Global Window). Of course how big the window is dictates the compression
you would get.
- The current timer is set again to fire in the next interval window.

@RequiresTimeSortedInput looks super interesting, happy to help out.
Although its a harder problem then the targeted timeseries use cases where
FixedWindows aggregations can be used before the final step.

>
> Or? Am I missing something?
>>
>> Jan
>> On 6/25/19 6:00 AM, Reza Rokni wrote:
>>
>>
>>
>> On Fri, 21 Jun 2019 at 18:02, Jan Lukavský  wrote:
>>
>>> Hi Reza,
>>>
>>> great prezentation on the Beam Summit. I have had a few posts here in
>>> the list during last few weeks, some of which might actually be related to
>>> both looping timers and validity windows. But maybe you will be able to see
>>> a different approach, than I do, so questions:
>>>
>>>  a) because of [1] timers might not be exactly ordered (the JIRA talks
>>> about DirectRunner, but I suppose the issue is present on all runners that
>>> use immutable bundles of size > 1, so might be related to Dataflow as
>>> well). This might cause issues when you try to introduce TTL for looping
>>> timers, because the TTL timer might get fired before regular looping timer,
>>> which might cause incorrect results (state cleared 

Re: Looping timer blog

2019-06-25 Thread Jan Lukavský


On 6/25/19 1:43 PM, Reza Rokni wrote:



On Tue, 25 Jun 2019 at 18:12, Jan Lukavský > wrote:


> The TTL check would be in the same Timer rather than a separate
Timer.  The max value processed in each OnTimer call would be
stored in valuestate and used as base to know how long it has been
seen the pipeline has seen an external value for that key.

OK, that seems to work, if you store maximum timestamp in a value
state (that is, basically you generate per-key watermark).

> You can store it in ValueState rather than BagState, but yes you
store that value in State ready for the next OnTimer() fire.

In my understanding of the problem, I'd say that this approach
seems a little suboptimal. Consider the following, when trying to
generate the OHLC data (open, high, low, close, that is move last
closing price to next window opening price)

 - suppose we have three times T1 < T2 < T3 < T4, where times T2
and T4 denote "end of windows" (closing times)

 - first (in processing time), we receive value for time T3, we
cache it in ValueState, we set timer for T4

 - next, we receive value for T1 - but we cannot overwrite the
value already written for T3, right? What to do then? And will we
reset timer to time T3?

 - basically, because we received *two* values, both of which are
needed and no timer could have been fired in between, we need both
values stored to know which value to emit when timer fires. And
consider that on batch, the timer fires only after we see all data
(without any ordering).

I assume you are referring to late data rather than out of order data 
( the later being taken care of with the in-memory sort). As discussed 
in the talk late data is a sharp edge, one solution for late data is 
to branch it away before GlobalWindow + State DoFn. This can then be 
output from the pipeline into a sink with a marker to initiate a 
manual procedure for correction. Essentially a manual redaction.


Which in-memory sort do you refer to? I'm pretty sure there must be 
sorting involved for this to work, but I'm not quite sure where exactly 
it is in your implementation. You said that you can put data in 
ValueState rather than BagState, so do you have a List as a value in the 
ValueState? Or do you wrap the stateful par do with some other sorting 
logic? And if so, how does this work on batch? I suppose that it has to 
all fit to memory. I think this all goes around the 
@RequiresTimeSortedInput annotation, that I propose. Maybe we can 
cooperate on that? :)




Or? Am I missing something?

Jan

On 6/25/19 6:00 AM, Reza Rokni wrote:



On Fri, 21 Jun 2019 at 18:02, Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Reza,

great prezentation on the Beam Summit. I have had a few posts
here in the list during last few weeks, some of which might
actually be related to both looping timers and validity
windows. But maybe you will be able to see a different
approach, than I do, so questions:

 a) because of [1] timers might not be exactly ordered (the
JIRA talks about DirectRunner, but I suppose the issue is
present on all runners that use immutable bundles of size >
1, so might be related to Dataflow as well). This might cause
issues when you try to introduce TTL for looping timers,
because the TTL timer might get fired before regular looping
timer, which might cause incorrect results (state cleared
before have been flushed).

The TTL check would be in the same Timer rather than a separate
Timer.  The max value processed in each OnTimer call would be
stored in valuestate and used as base to know how long it has
been seen the pipeline has seen an external value for that key.

 b) because stateful pardo doesn't sort by timestamp, that
implies, that you have to store last values in BagState (as
opposed to the blog, where you just emit identity value of
sum operation), right?

You can store it in ValueState rather than BagState, but yes you
store that value in State ready for the next OnTimer() fire.

 c) because of how stateful pardo currently works on batch,
does that imply that all values (per key) would have to be
stored in memory? would that scale?

This is one of the sharp edges and the answer is ... it depends
:-) I would recommend always using a  FixedWindow+Combiner before
this step, this will compress the values into something much
smaller. For example in case of building 'candles' this will
compress down to low/hi/first/last values per FixedWindow length.
If the window length is very small there maybe no compression,
but in most cases I have seen this is a ok compromise.

There is a discussion about problem a) in [2], but maybe
there is some 

Re: Looping timer blog

2019-06-25 Thread Reza Rokni
On Tue, 25 Jun 2019 at 18:12, Jan Lukavský  wrote:

> > The TTL check would be in the same Timer rather than a separate Timer.
> The max value processed in each OnTimer call would be stored in valuestate
> and used as base to know how long it has been seen the pipeline has seen an
> external value for that key.
>
> OK, that seems to work, if you store maximum timestamp in a value state
> (that is, basically you generate per-key watermark).
>
> > You can store it in ValueState rather than BagState, but yes you store
> that value in State ready for the next OnTimer() fire.
>
> In my understanding of the problem, I'd say that this approach seems a
> little suboptimal. Consider the following, when trying to generate the OHLC
> data (open, high, low, close, that is move last closing price to next
> window opening price)
>
>  - suppose we have three times T1 < T2 < T3 < T4, where times T2 and T4
> denote "end of windows" (closing times)
>
>  - first (in processing time), we receive value for time T3, we cache it
> in ValueState, we set timer for T4
>
>  - next, we receive value for T1 - but we cannot overwrite the value
> already written for T3, right? What to do then? And will we reset timer to
> time T3?
>
>  - basically, because we received *two* values, both of which are needed
> and no timer could have been fired in between, we need both values stored
> to know which value to emit when timer fires. And consider that on batch,
> the timer fires only after we see all data (without any ordering).
>
I assume you are referring to late data rather than out of order data ( the
later being taken care of with the in-memory sort). As discussed in the
talk late data is a sharp edge, one solution for late data is to branch it
away before GlobalWindow + State DoFn. This can then be output from the
pipeline into a sink with a marker to initiate a manual procedure for
correction. Essentially a manual redaction.

Or? Am I missing something?
>
> Jan
> On 6/25/19 6:00 AM, Reza Rokni wrote:
>
>
>
> On Fri, 21 Jun 2019 at 18:02, Jan Lukavský  wrote:
>
>> Hi Reza,
>>
>> great prezentation on the Beam Summit. I have had a few posts here in the
>> list during last few weeks, some of which might actually be related to both
>> looping timers and validity windows. But maybe you will be able to see a
>> different approach, than I do, so questions:
>>
>>  a) because of [1] timers might not be exactly ordered (the JIRA talks
>> about DirectRunner, but I suppose the issue is present on all runners that
>> use immutable bundles of size > 1, so might be related to Dataflow as
>> well). This might cause issues when you try to introduce TTL for looping
>> timers, because the TTL timer might get fired before regular looping timer,
>> which might cause incorrect results (state cleared before have been
>> flushed).
>>
> The TTL check would be in the same Timer rather than a separate Timer.
> The max value processed in each OnTimer call would be stored in valuestate
> and used as base to know how long it has been seen the pipeline has seen an
> external value for that key.
>
>>  b) because stateful pardo doesn't sort by timestamp, that implies, that
>> you have to store last values in BagState (as opposed to the blog, where
>> you just emit identity value of sum operation), right?
>>
> You can store it in ValueState rather than BagState, but yes you store
> that value in State ready for the next OnTimer() fire.
>
>>  c) because of how stateful pardo currently works on batch, does that
>> imply that all values (per key) would have to be stored in memory? would
>> that scale?
>>
> This is one of the sharp edges and the answer is ... it depends :-) I
> would recommend always using a  FixedWindow+Combiner before this step, this
> will compress the values into something much smaller. For example in case
> of building 'candles' this will compress down to low/hi/first/last values
> per FixedWindow length. If the window length is very small there maybe no
> compression, but in most cases I have seen this is a ok compromise.
>
>> There is a discussion about problem a) in [2], but maybe there is some
>> different approach possible. For problem b) and c) there is a proposal [3].
>> When the input is sorted, it starts to work both in batch and with
>> ValueState, because the last value is the *valid* value.
>>
> There was also a discussion on dev@ around a sorted Map state, which
> would be very cool for this usecase.
>
>> This has even connection with the mentioned validity windows, as if you
>> sort by timestamp, the _last_ value is the _valid_ value, so is essentially
>> boils down to keep single value per key (and again, starts to work in both
>> batch and stream).
>>
> one for Tyler :-)
>
>> I even have a suspicion, that sorting by timestamp has close relation to
>> retractions, because when you are using sorted streams, retractions
>> actually became only diff between last emitted pane, and current pane. That
>> might even help implement that in 

Re: Looping timer blog

2019-06-25 Thread Jan Lukavský
> The TTL check would be in the same Timer rather than a separate 
Timer.  The max value processed in each OnTimer call would be stored in 
valuestate and used as base to know how long it has been seen the 
pipeline has seen an external value for that key.


OK, that seems to work, if you store maximum timestamp in a value state 
(that is, basically you generate per-key watermark).


> You can store it in ValueState rather than BagState, but yes you 
store that value in State ready for the next OnTimer() fire.


In my understanding of the problem, I'd say that this approach seems a 
little suboptimal. Consider the following, when trying to generate the 
OHLC data (open, high, low, close, that is move last closing price to 
next window opening price)


 - suppose we have three times T1 < T2 < T3 < T4, where times T2 and T4 
denote "end of windows" (closing times)


 - first (in processing time), we receive value for time T3, we cache 
it in ValueState, we set timer for T4


 - next, we receive value for T1 - but we cannot overwrite the value 
already written for T3, right? What to do then? And will we reset timer 
to time T3?


 - basically, because we received *two* values, both of which are 
needed and no timer could have been fired in between, we need both 
values stored to know which value to emit when timer fires. And consider 
that on batch, the timer fires only after we see all data (without any 
ordering).


Or? Am I missing something?

Jan

On 6/25/19 6:00 AM, Reza Rokni wrote:



On Fri, 21 Jun 2019 at 18:02, Jan Lukavský > wrote:


Hi Reza,

great prezentation on the Beam Summit. I have had a few posts here
in the list during last few weeks, some of which might actually be
related to both looping timers and validity windows. But maybe you
will be able to see a different approach, than I do, so questions:

 a) because of [1] timers might not be exactly ordered (the JIRA
talks about DirectRunner, but I suppose the issue is present on
all runners that use immutable bundles of size > 1, so might be
related to Dataflow as well). This might cause issues when you try
to introduce TTL for looping timers, because the TTL timer might
get fired before regular looping timer, which might cause
incorrect results (state cleared before have been flushed).

The TTL check would be in the same Timer rather than a separate 
Timer.  The max value processed in each OnTimer call would be stored 
in valuestate and used as base to know how long it has been seen the 
pipeline has seen an external value for that key.


 b) because stateful pardo doesn't sort by timestamp, that
implies, that you have to store last values in BagState (as
opposed to the blog, where you just emit identity value of sum
operation), right?

You can store it in ValueState rather than BagState, but yes you store 
that value in State ready for the next OnTimer() fire.


 c) because of how stateful pardo currently works on batch, does
that imply that all values (per key) would have to be stored in
memory? would that scale?

This is one of the sharp edges and the answer is ... it depends :-) I 
would recommend always using a FixedWindow+Combiner before this step, 
this will compress the values into something much smaller. For example 
in case of building 'candles' this will compress down to 
low/hi/first/last values per FixedWindow length. If the window length 
is very small there maybe no compression, but in most cases I have 
seen this is a ok compromise.


There is a discussion about problem a) in [2], but maybe there is
some different approach possible. For problem b) and c) there is a
proposal [3]. When the input is sorted, it starts to work both in
batch and with ValueState, because the last value is the *valid*
value.

There was also a discussion on dev@ around a sorted Map state, which 
would be very cool for this usecase.


This has even connection with the mentioned validity windows, as
if you sort by timestamp, the _last_ value is the _valid_ value,
so is essentially boils down to keep single value per key (and
again, starts to work in both batch and stream).

one for Tyler :-)

I even have a suspicion, that sorting by timestamp has close
relation to retractions, because when you are using sorted
streams, retractions actually became only diff between last
emitted pane, and current pane. That might even help implement
that in general, but I might be missing something. This just
popped in my head today, as I was thinking why there was actually
no (or little) need for retractions in Euphoria model (very
similar to Beam, actually differs by the sorting thing :)), and
why it the need pops out so often in Beam.

Retractions will be possible with this, but it does mean that we would 
need to keep old versions around, something built in would be very 
cool rather than building 

Re: Looping timer blog

2019-06-24 Thread Reza Rokni
On Fri, 21 Jun 2019 at 18:02, Jan Lukavský  wrote:

> Hi Reza,
>
> great prezentation on the Beam Summit. I have had a few posts here in the
> list during last few weeks, some of which might actually be related to both
> looping timers and validity windows. But maybe you will be able to see a
> different approach, than I do, so questions:
>
>  a) because of [1] timers might not be exactly ordered (the JIRA talks
> about DirectRunner, but I suppose the issue is present on all runners that
> use immutable bundles of size > 1, so might be related to Dataflow as
> well). This might cause issues when you try to introduce TTL for looping
> timers, because the TTL timer might get fired before regular looping timer,
> which might cause incorrect results (state cleared before have been
> flushed).
>
The TTL check would be in the same Timer rather than a separate Timer.  The
max value processed in each OnTimer call would be stored in valuestate and
used as base to know how long it has been seen the pipeline has seen an
external value for that key.

>  b) because stateful pardo doesn't sort by timestamp, that implies, that
> you have to store last values in BagState (as opposed to the blog, where
> you just emit identity value of sum operation), right?
>
You can store it in ValueState rather than BagState, but yes you store that
value in State ready for the next OnTimer() fire.

>  c) because of how stateful pardo currently works on batch, does that
> imply that all values (per key) would have to be stored in memory? would
> that scale?
>
This is one of the sharp edges and the answer is ... it depends :-) I would
recommend always using a  FixedWindow+Combiner before this step, this will
compress the values into something much smaller. For example in case of
building 'candles' this will compress down to low/hi/first/last values per
FixedWindow length. If the window length is very small there maybe no
compression, but in most cases I have seen this is a ok compromise.

> There is a discussion about problem a) in [2], but maybe there is some
> different approach possible. For problem b) and c) there is a proposal [3].
> When the input is sorted, it starts to work both in batch and with
> ValueState, because the last value is the *valid* value.
>
There was also a discussion on dev@ around a sorted Map state, which would
be very cool for this usecase.

> This has even connection with the mentioned validity windows, as if you
> sort by timestamp, the _last_ value is the _valid_ value, so is essentially
> boils down to keep single value per key (and again, starts to work in both
> batch and stream).
>
one for Tyler :-)

> I even have a suspicion, that sorting by timestamp has close relation to
> retractions, because when you are using sorted streams, retractions
> actually became only diff between last emitted pane, and current pane. That
> might even help implement that in general, but I might be missing
> something. This just popped in my head today, as I was thinking why there
> was actually no (or little) need for retractions in Euphoria model (very
> similar to Beam, actually differs by the sorting thing :)), and why it the
> need pops out so often in Beam.
>
Retractions will be possible with this, but it does mean that we would need
to keep old versions around, something built in would be very cool rather
than building it with this pattern.

> I'd be very happy to hear what you think about all of this.
>
> Cheers,
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
> [2]
> https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E
>
> [3]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
> On 6/21/19 8:12 AM, Reza Rokni wrote:
>
> Great question, one thing that we did not cover in the blog and I think we
> should have is the use case where you would want to bootstrap the
> pipeline.
>
> One option would be on startup to have an extra bounded source that is
> read and flattened into the main pipeline, the source will need to contain
> values in  Timestamped format which would correspond to the first window
> that you would like to kickstart the process from.  Will see if I can try
> and find some time to code up an example and add that and the looping timer
> code into the Beam patterns.
>
> https://beam.apache.org/documentation/patterns/overview/
>
> Cheers
> Reza
>
>
>
>
>
> On Fri, 21 Jun 2019 at 07:59, Manu Zhang  wrote:
>
>> Indeed interesting pattern.
>>
>> One minor question. It seems the timer is triggered by the first element
>> so what if there is no data in the "first interval" ?
>>
>> Thanks for the write-up.
>> Manu
>>
>> On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni  wrote:
>>
>>> Hi folks,
>>>
>>> Just wanted to drop a note here on a new pattern that folks may find
>>> interesting, called  Looping Timers. It allows for default values to be
>>> created in interval windows in the 

Re: Looping timer blog

2019-06-21 Thread Jan Lukavský

Hi Reza,

great prezentation on the Beam Summit. I have had a few posts here in 
the list during last few weeks, some of which might actually be related 
to both looping timers and validity windows. But maybe you will be able 
to see a different approach, than I do, so questions:


 a) because of [1] timers might not be exactly ordered (the JIRA talks 
about DirectRunner, but I suppose the issue is present on all runners 
that use immutable bundles of size > 1, so might be related to Dataflow 
as well). This might cause issues when you try to introduce TTL for 
looping timers, because the TTL timer might get fired before regular 
looping timer, which might cause incorrect results (state cleared before 
have been flushed).


 b) because stateful pardo doesn't sort by timestamp, that implies, 
that you have to store last values in BagState (as opposed to the blog, 
where you just emit identity value of sum operation), right?


 c) because of how stateful pardo currently works on batch, does that 
imply that all values (per key) would have to be stored in memory? would 
that scale?


There is a discussion about problem a) in [2], but maybe there is some 
different approach possible. For problem b) and c) there is a proposal 
[3]. When the input is sorted, it starts to work both in batch and with 
ValueState, because the last value is the *valid* value.


This has even connection with the mentioned validity windows, as if you 
sort by timestamp, the _last_ value is the _valid_ value, so is 
essentially boils down to keep single value per key (and again, starts 
to work in both batch and stream).


I even have a suspicion, that sorting by timestamp has close relation to 
retractions, because when you are using sorted streams, retractions 
actually became only diff between last emitted pane, and current pane. 
That might even help implement that in general, but I might be missing 
something. This just popped in my head today, as I was thinking why 
there was actually no (or little) need for retractions in Euphoria model 
(very similar to Beam, actually differs by the sorting thing :)), and 
why it the need pops out so often in Beam.


I'd be very happy to hear what you think about all of this.

Cheers,

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520

[2] 
https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E


[3] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


On 6/21/19 8:12 AM, Reza Rokni wrote:
Great question, one thing that we did not cover in the blog and I 
think we should have is the use case where you would want to bootstrap 
the pipeline.


One option would be on startup to have an extra bounded source that is 
read and flattened into the main pipeline, the source will need to 
contain values in  Timestamped format which would correspond to the 
first window that you would like to kickstart the process from.  Will 
see if I can try and find some time to code up an example and add that 
and the looping timer code into the Beam patterns.


https://beam.apache.org/documentation/patterns/overview/

Cheers
Reza





On Fri, 21 Jun 2019 at 07:59, Manu Zhang > wrote:


Indeed interesting pattern.

One minor question. It seems the timer is triggered by the first
element so what if there is no data in the "first interval" ?

Thanks for the write-up.
Manu

On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni mailto:r...@google.com>> wrote:

Hi folks,

Just wanted to drop a note here on a new pattern that folks
may find interesting, called  Looping Timers. It allows for
default values to be created in interval windows in the
absence of any external data coming into the pipeline. The
details are in this blog below:

https://beam.apache.org/blog/2019/06/11/looping-timers.html

Its main utility is when dealing with time series data. There
are still rough edges, like dealing with TTL and it would be
great to hear feedback on ways it can be improved.

The next pattern to publish in this domain will assist will
hold and propagation of values from one interval window to the
next, which coupled to looping timers starts to solve some
interesting problems.

Cheers

Reza



-- 


This email may be confidential and privileged. If you received
this communication by mistake, please don't forward it to
anyone else, please erase all copies and attachments, and
please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are
provided solely as a basis for further discussion, and are not
intended to be and do not constitute a legally binding
obligation. No legally binding obligations will be created,
   

Re: Looping timer blog

2019-06-21 Thread Reza Rokni
Great question, one thing that we did not cover in the blog and I think we
should have is the use case where you would want to bootstrap the pipeline.

One option would be on startup to have an extra bounded source that is read
and flattened into the main pipeline, the source will need to contain
values in  Timestamped format which would correspond to the first window
that you would like to kickstart the process from.  Will see if I can try
and find some time to code up an example and add that and the looping timer
code into the Beam patterns.

https://beam.apache.org/documentation/patterns/overview/

Cheers
Reza





On Fri, 21 Jun 2019 at 07:59, Manu Zhang  wrote:

> Indeed interesting pattern.
>
> One minor question. It seems the timer is triggered by the first element
> so what if there is no data in the "first interval" ?
>
> Thanks for the write-up.
> Manu
>
> On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni  wrote:
>
>> Hi folks,
>>
>> Just wanted to drop a note here on a new pattern that folks may find
>> interesting, called  Looping Timers. It allows for default values to be
>> created in interval windows in the absence of any external data coming into
>> the pipeline. The details are in this blog below:
>>
>> https://beam.apache.org/blog/2019/06/11/looping-timers.html
>>
>> Its main utility is when dealing with time series data. There are still
>> rough edges, like dealing with TTL and it would be great to hear
>> feedback on ways it can be improved.
>>
>> The next pattern to publish in this domain will assist will hold and
>> propagation of values from one interval window to the next, which coupled
>> to looping timers starts to solve some interesting problems.
>>
>> Cheers
>>
>> Reza
>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: Looping timer blog

2019-06-20 Thread Manu Zhang
Indeed interesting pattern.

One minor question. It seems the timer is triggered by the first element so
what if there is no data in the "first interval" ?

Thanks for the write-up.
Manu

On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni  wrote:

> Hi folks,
>
> Just wanted to drop a note here on a new pattern that folks may find
> interesting, called  Looping Timers. It allows for default values to be
> created in interval windows in the absence of any external data coming into
> the pipeline. The details are in this blog below:
>
> https://beam.apache.org/blog/2019/06/11/looping-timers.html
>
> Its main utility is when dealing with time series data. There are still
> rough edges, like dealing with TTL and it would be great to hear
> feedback on ways it can be improved.
>
> The next pattern to publish in this domain will assist will hold and
> propagation of values from one interval window to the next, which coupled
> to looping timers starts to solve some interesting problems.
>
> Cheers
>
> Reza
>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Looping timer blog

2019-06-18 Thread Reza Rokni
Hi folks,

Just wanted to drop a note here on a new pattern that folks may find
interesting, called  Looping Timers. It allows for default values to be
created in interval windows in the absence of any external data coming into
the pipeline. The details are in this blog below:

https://beam.apache.org/blog/2019/06/11/looping-timers.html

Its main utility is when dealing with time series data. There are still
rough edges, like dealing with TTL and it would be great to hear
feedback on ways it can be improved.

The next pattern to publish in this domain will assist will hold and
propagation of values from one interval window to the next, which coupled
to looping timers starts to solve some interesting problems.

Cheers

Reza



-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.