Re: Watermark generation in Windowed Operators

2016-09-26 Thread David Yan
Actually on a second thought, the join accumulation interface for the
non-keyed join operator can support theta joins if you have the proper
accumulation implementation for that.

David

On Mon, Sep 26, 2016 at 12:42 PM, David Yan  wrote:

> Chinmay,
>
> Just to clarify, the Join Operator does not support theta joins. It only
> supports equi-joins on either the Window, or both the Window and the Key.
>
> David
>
> On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar <
> chin...@datatorrent.com> wrote:
>
>> Thanks for the information guys.
>>
>> David, I can take a look at heuristic watermark if I get any free cycles.
>>
>> Shunxin, does the Join operator that you're implementing support theta
>> join
>> or is it subset of the theta join?
>>
>> Thanks,
>> Chinmay.
>>
>>
>>
>> On Sat, Sep 17, 2016 at 1:21 AM, David Yan  wrote:
>>
>> > Hi Shunxin,
>> >
>> > If the watermark code in your PR is not behaving the way it should,
>> please
>> > do change it. Thanks!
>> >
>> > David
>> >
>> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu 
>> wrote:
>> >
>> > > Hi David,
>> > >
>> > > Thanks for the clarification. Should we update the watermark for join
>> > > operator when there's a watermark arrived from one of the input
>> streams
>> > > even if the watermark from another input stream is not arrived yet?
>> > >
>> > > Shunxin
>> > >
>> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan 
>> > wrote:
>> > >
>> > > > Actually, that's not entirely true. Here are the points about the
>> > > watermark
>> > > > tuple generation of the join operator:
>> > > >
>> > > > 1) We keep the timestamp of the latest watermark for each input port
>> > > >
>> > > > 2) We keep another timestamp that is equal to minimum of all the
>> > > timestamps
>> > > > mentioned in (1).
>> > > >
>> > > > 3) Upon arrival of a watermark from an input port, we update the
>> > > timestamp
>> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
>> > > > generate the watermark tuple with the timestamp that is equal to the
>> > new
>> > > > value of (2).
>> > > >
>> > > > 4) That means initially, the watermark is only generated when we
>> have
>> > > seen
>> > > > a watermark for all input ports. And the fact that we take the
>> smallest
>> > > > timestamp in (2) means we only consider a window as late only if all
>> > > input
>> > > > streams say that particular window is late.
>> > > >
>> > > > David
>> > > >
>> > > >
>> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
>> > > wrote:
>> > > >
>> > > > > Hi Chinmay,
>> > > > >
>> > > > > Base on the discussion I had with David, and David please correct
>> me
>> > > if I
>> > > > > am wrong, the watermark for Windowed Join Operator should be
>> indeed
>> > > > > depending on all the input streams. If a tuple is considered late
>> for
>> > > one
>> > > > > input stream, it should also be considered late for the whole join
>> > > > > operator. That's why in the AbstractWindowedJoinOperator, it
>> always
>> > > > selects
>> > > > > the watermark with the smallest timestamp from all the latest
>> > > watermarks
>> > > > > coming from upstreams as its current watermark, so that it can
>> make
>> > > sure
>> > > > > that it's always keeping the strictest watermark to eliminate late
>> > > > tuples.
>> > > > >
>> > > > > Shunxin
>> > > > >
>> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <
>> da...@datatorrent.com>
>> > > > wrote:
>> > > > >
>> > > > > > I think in theory, the watermark should be sent by the input
>> > operator
>> > > > > since
>> > > > > > the input should have the knowledge of the criteria of lateness
>> > since
>> > > > it
>> > > > > > can depend on many factors like the time of the day, the source
>> of
>> > > the
>> > > > > data
>> > > > > > (e.g. mobile data), that the WindowedOperator should in general
>> > make
>> > > no
>> > > > > > assumption about.
>> > > > > >
>> > > > > > However, I think it's possible to implement some kind of
>> watermark
>> > > > > > generation in the WindowedOperator itself if that knowledge is
>> not
>> > > > > > available from the input. It's actually already doing that if
>> you
>> > > call
>> > > > > > the setFixedWatermark
>> > > > > > method, which will generate a watermark tuple, with a timestamp
>> > that
>> > > is
>> > > > > > based on the derived time from the streaming window id,
>> downstream
>> > > for
>> > > > > each
>> > > > > > streaming window. It's possible to add the support of heuristic
>> > > > watermark
>> > > > > > generation as well and you're welcome to take that up.
>> > > > > >
>> > > > > > For the Windowed Join operator, the watermark generated for
>> > > downstream
>> > > > > > depends on the watermark arriving from each input stream, and
>> it's
>> > > not
>> > > > > just
>> > > > > > a simple propagate. Shunxin can comment more on this.
>> > > > > >
>> > > > > > David
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
>> > > > chin...@apache.org>
>> > > > > > wrote:

Re: Watermark generation in Windowed Operators

2016-09-26 Thread David Yan
Chinmay,

Just to clarify, the Join Operator does not support theta joins. It only
supports equi-joins on either the Window, or both the Window and the Key.

David

On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar 
wrote:

> Thanks for the information guys.
>
> David, I can take a look at heuristic watermark if I get any free cycles.
>
> Shunxin, does the Join operator that you're implementing support theta join
> or is it subset of the theta join?
>
> Thanks,
> Chinmay.
>
>
>
> On Sat, Sep 17, 2016 at 1:21 AM, David Yan  wrote:
>
> > Hi Shunxin,
> >
> > If the watermark code in your PR is not behaving the way it should,
> please
> > do change it. Thanks!
> >
> > David
> >
> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu 
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for the clarification. Should we update the watermark for join
> > > operator when there's a watermark arrived from one of the input streams
> > > even if the watermark from another input stream is not arrived yet?
> > >
> > > Shunxin
> > >
> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan 
> > wrote:
> > >
> > > > Actually, that's not entirely true. Here are the points about the
> > > watermark
> > > > tuple generation of the join operator:
> > > >
> > > > 1) We keep the timestamp of the latest watermark for each input port
> > > >
> > > > 2) We keep another timestamp that is equal to minimum of all the
> > > timestamps
> > > > mentioned in (1).
> > > >
> > > > 3) Upon arrival of a watermark from an input port, we update the
> > > timestamp
> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > > generate the watermark tuple with the timestamp that is equal to the
> > new
> > > > value of (2).
> > > >
> > > > 4) That means initially, the watermark is only generated when we have
> > > seen
> > > > a watermark for all input ports. And the fact that we take the
> smallest
> > > > timestamp in (2) means we only consider a window as late only if all
> > > input
> > > > streams say that particular window is late.
> > > >
> > > > David
> > > >
> > > >
> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
> > > wrote:
> > > >
> > > > > Hi Chinmay,
> > > > >
> > > > > Base on the discussion I had with David, and David please correct
> me
> > > if I
> > > > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > > > depending on all the input streams. If a tuple is considered late
> for
> > > one
> > > > > input stream, it should also be considered late for the whole join
> > > > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > > > selects
> > > > > the watermark with the smallest timestamp from all the latest
> > > watermarks
> > > > > coming from upstreams as its current watermark, so that it can make
> > > sure
> > > > > that it's always keeping the strictest watermark to eliminate late
> > > > tuples.
> > > > >
> > > > > Shunxin
> > > > >
> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan  >
> > > > wrote:
> > > > >
> > > > > > I think in theory, the watermark should be sent by the input
> > operator
> > > > > since
> > > > > > the input should have the knowledge of the criteria of lateness
> > since
> > > > it
> > > > > > can depend on many factors like the time of the day, the source
> of
> > > the
> > > > > data
> > > > > > (e.g. mobile data), that the WindowedOperator should in general
> > make
> > > no
> > > > > > assumption about.
> > > > > >
> > > > > > However, I think it's possible to implement some kind of
> watermark
> > > > > > generation in the WindowedOperator itself if that knowledge is
> not
> > > > > > available from the input. It's actually already doing that if you
> > > call
> > > > > > the setFixedWatermark
> > > > > > method, which will generate a watermark tuple, with a timestamp
> > that
> > > is
> > > > > > based on the derived time from the streaming window id,
> downstream
> > > for
> > > > > each
> > > > > > streaming window. It's possible to add the support of heuristic
> > > > watermark
> > > > > > generation as well and you're welcome to take that up.
> > > > > >
> > > > > > For the Windowed Join operator, the watermark generated for
> > > downstream
> > > > > > depends on the watermark arriving from each input stream, and
> it's
> > > not
> > > > > just
> > > > > > a simple propagate. Shunxin can comment more on this.
> > > > > >
> > > > > > David
> > > > > >
> > > > > >
> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > > > chin...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I was looking at Windowed Operator APIs and have to mention
> > they're
> > > > > > pretty
> > > > > > > nicely done.
> > > > > > >
> > > > > > > I have a question related to watermark generation.
> > > > > > >
> > > > > > > What I understood is that for completion of processing of an
> > event
> > > > > window
> > > > > > > one has provision for sending of watermark tuple from some
> > previous
> > >

Re: Watermark generation in Windowed Operators

2016-09-18 Thread Bhupesh Chawda
Hi Chinmay,

Hope I am not too late.

If I understand correctly, you need a watermark tuple which is consistent
for the two inputs that the join operator is receiving. One of the ways, as
David suggested is to allow the Windowed Operator itself to generate the
watermark tuple.

One such heuristic is to look at the latest time among the incoming tuples
and have a watermark generated which is a fixed distance from the ideal
watermark. In other words, once you have the time of the watermark =
(latest timestamp seen across all ports - fixed delay), you can assume a
watermark tuple with that timestamp. This can be done at the end of the
window.

~ Bhupesh

On Sun, Sep 18, 2016 at 6:13 AM, Shunxin Lu  wrote:

> Hi Chinmay,
>
> The Join Operator should be able to support any user defined join
> operation. You just need to create a subclass implementing the
> JoinAccumulation interface and define the behavior you want, then set the
> accumulation after creating a Join operator instance . Please take a look
> at the Combine and PojoInnerJoin in package
> org.apache.apex.malhar.lib.window.impl, and see how to use the
> accumulation
> in WindowedJoinOperatorTestApplication.
>
> Thanks,
> Shunxin
>
> On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar <
> chin...@datatorrent.com>
> wrote:
>
> > Thanks for the information guys.
> >
> > David, I can take a look at heuristic watermark if I get any free cycles.
> >
> > Shunxin, does the Join operator that you're implementing support theta
> join
> > or is it subset of the theta join?
> >
> > Thanks,
> > Chinmay.
> >
> >
> >
> > On Sat, Sep 17, 2016 at 1:21 AM, David Yan 
> wrote:
> >
> > > Hi Shunxin,
> > >
> > > If the watermark code in your PR is not behaving the way it should,
> > please
> > > do change it. Thanks!
> > >
> > > David
> > >
> > > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu 
> > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for the clarification. Should we update the watermark for join
> > > > operator when there's a watermark arrived from one of the input
> streams
> > > > even if the watermark from another input stream is not arrived yet?
> > > >
> > > > Shunxin
> > > >
> > > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan 
> > > wrote:
> > > >
> > > > > Actually, that's not entirely true. Here are the points about the
> > > > watermark
> > > > > tuple generation of the join operator:
> > > > >
> > > > > 1) We keep the timestamp of the latest watermark for each input
> port
> > > > >
> > > > > 2) We keep another timestamp that is equal to minimum of all the
> > > > timestamps
> > > > > mentioned in (1).
> > > > >
> > > > > 3) Upon arrival of a watermark from an input port, we update the
> > > > timestamp
> > > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > > > generate the watermark tuple with the timestamp that is equal to
> the
> > > new
> > > > > value of (2).
> > > > >
> > > > > 4) That means initially, the watermark is only generated when we
> have
> > > > seen
> > > > > a watermark for all input ports. And the fact that we take the
> > smallest
> > > > > timestamp in (2) means we only consider a window as late only if
> all
> > > > input
> > > > > streams say that particular window is late.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
> > > > wrote:
> > > > >
> > > > > > Hi Chinmay,
> > > > > >
> > > > > > Base on the discussion I had with David, and David please correct
> > me
> > > > if I
> > > > > > am wrong, the watermark for Windowed Join Operator should be
> indeed
> > > > > > depending on all the input streams. If a tuple is considered late
> > for
> > > > one
> > > > > > input stream, it should also be considered late for the whole
> join
> > > > > > operator. That's why in the AbstractWindowedJoinOperator, it
> always
> > > > > selects
> > > > > > the watermark with the smallest timestamp from all the latest
> > > > watermarks
> > > > > > coming from upstreams as its current watermark, so that it can
> make
> > > > sure
> > > > > > that it's always keeping the strictest watermark to eliminate
> late
> > > > > tuples.
> > > > > >
> > > > > > Shunxin
> > > > > >
> > > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <
> da...@datatorrent.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > I think in theory, the watermark should be sent by the input
> > > operator
> > > > > > since
> > > > > > > the input should have the knowledge of the criteria of lateness
> > > since
> > > > > it
> > > > > > > can depend on many factors like the time of the day, the source
> > of
> > > > the
> > > > > > data
> > > > > > > (e.g. mobile data), that the WindowedOperator should in general
> > > make
> > > > no
> > > > > > > assumption about.
> > > > > > >
> > > > > > > However, I think it's possible to implement some kind of
> > watermark
> > > > > > > generation in the WindowedOperator itself if that knowledge is
> > not
> > > > > > > available from the input. It'

Re: Watermark generation in Windowed Operators

2016-09-17 Thread Shunxin Lu
Hi Chinmay,

The Join Operator should be able to support any user defined join
operation. You just need to create a subclass implementing the
JoinAccumulation interface and define the behavior you want, then set the
accumulation after creating a Join operator instance . Please take a look
at the Combine and PojoInnerJoin in package
org.apache.apex.malhar.lib.window.impl, and see how to use the accumulation
in WindowedJoinOperatorTestApplication.

Thanks,
Shunxin

On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar 
wrote:

> Thanks for the information guys.
>
> David, I can take a look at heuristic watermark if I get any free cycles.
>
> Shunxin, does the Join operator that you're implementing support theta join
> or is it subset of the theta join?
>
> Thanks,
> Chinmay.
>
>
>
> On Sat, Sep 17, 2016 at 1:21 AM, David Yan  wrote:
>
> > Hi Shunxin,
> >
> > If the watermark code in your PR is not behaving the way it should,
> please
> > do change it. Thanks!
> >
> > David
> >
> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu 
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for the clarification. Should we update the watermark for join
> > > operator when there's a watermark arrived from one of the input streams
> > > even if the watermark from another input stream is not arrived yet?
> > >
> > > Shunxin
> > >
> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan 
> > wrote:
> > >
> > > > Actually, that's not entirely true. Here are the points about the
> > > watermark
> > > > tuple generation of the join operator:
> > > >
> > > > 1) We keep the timestamp of the latest watermark for each input port
> > > >
> > > > 2) We keep another timestamp that is equal to minimum of all the
> > > timestamps
> > > > mentioned in (1).
> > > >
> > > > 3) Upon arrival of a watermark from an input port, we update the
> > > timestamp
> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > > generate the watermark tuple with the timestamp that is equal to the
> > new
> > > > value of (2).
> > > >
> > > > 4) That means initially, the watermark is only generated when we have
> > > seen
> > > > a watermark for all input ports. And the fact that we take the
> smallest
> > > > timestamp in (2) means we only consider a window as late only if all
> > > input
> > > > streams say that particular window is late.
> > > >
> > > > David
> > > >
> > > >
> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
> > > wrote:
> > > >
> > > > > Hi Chinmay,
> > > > >
> > > > > Base on the discussion I had with David, and David please correct
> me
> > > if I
> > > > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > > > depending on all the input streams. If a tuple is considered late
> for
> > > one
> > > > > input stream, it should also be considered late for the whole join
> > > > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > > > selects
> > > > > the watermark with the smallest timestamp from all the latest
> > > watermarks
> > > > > coming from upstreams as its current watermark, so that it can make
> > > sure
> > > > > that it's always keeping the strictest watermark to eliminate late
> > > > tuples.
> > > > >
> > > > > Shunxin
> > > > >
> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan  >
> > > > wrote:
> > > > >
> > > > > > I think in theory, the watermark should be sent by the input
> > operator
> > > > > since
> > > > > > the input should have the knowledge of the criteria of lateness
> > since
> > > > it
> > > > > > can depend on many factors like the time of the day, the source
> of
> > > the
> > > > > data
> > > > > > (e.g. mobile data), that the WindowedOperator should in general
> > make
> > > no
> > > > > > assumption about.
> > > > > >
> > > > > > However, I think it's possible to implement some kind of
> watermark
> > > > > > generation in the WindowedOperator itself if that knowledge is
> not
> > > > > > available from the input. It's actually already doing that if you
> > > call
> > > > > > the setFixedWatermark
> > > > > > method, which will generate a watermark tuple, with a timestamp
> > that
> > > is
> > > > > > based on the derived time from the streaming window id,
> downstream
> > > for
> > > > > each
> > > > > > streaming window. It's possible to add the support of heuristic
> > > > watermark
> > > > > > generation as well and you're welcome to take that up.
> > > > > >
> > > > > > For the Windowed Join operator, the watermark generated for
> > > downstream
> > > > > > depends on the watermark arriving from each input stream, and
> it's
> > > not
> > > > > just
> > > > > > a simple propagate. Shunxin can comment more on this.
> > > > > >
> > > > > > David
> > > > > >
> > > > > >
> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > > > chin...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I was looking at Windowed Operator APIs and have to mention
> > they're
> > > > > > pretty
> > > > > 

Re: Watermark generation in Windowed Operators

2016-09-17 Thread Chinmay Kolhatkar
Thanks for the information guys.

David, I can take a look at heuristic watermark if I get any free cycles.

Shunxin, does the Join operator that you're implementing support theta join
or is it subset of the theta join?

Thanks,
Chinmay.



On Sat, Sep 17, 2016 at 1:21 AM, David Yan  wrote:

> Hi Shunxin,
>
> If the watermark code in your PR is not behaving the way it should, please
> do change it. Thanks!
>
> David
>
> On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu  wrote:
>
> > Hi David,
> >
> > Thanks for the clarification. Should we update the watermark for join
> > operator when there's a watermark arrived from one of the input streams
> > even if the watermark from another input stream is not arrived yet?
> >
> > Shunxin
> >
> > On Fri, Sep 16, 2016 at 10:59 AM, David Yan 
> wrote:
> >
> > > Actually, that's not entirely true. Here are the points about the
> > watermark
> > > tuple generation of the join operator:
> > >
> > > 1) We keep the timestamp of the latest watermark for each input port
> > >
> > > 2) We keep another timestamp that is equal to minimum of all the
> > timestamps
> > > mentioned in (1).
> > >
> > > 3) Upon arrival of a watermark from an input port, we update the
> > timestamp
> > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > generate the watermark tuple with the timestamp that is equal to the
> new
> > > value of (2).
> > >
> > > 4) That means initially, the watermark is only generated when we have
> > seen
> > > a watermark for all input ports. And the fact that we take the smallest
> > > timestamp in (2) means we only consider a window as late only if all
> > input
> > > streams say that particular window is late.
> > >
> > > David
> > >
> > >
> > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
> > wrote:
> > >
> > > > Hi Chinmay,
> > > >
> > > > Base on the discussion I had with David, and David please correct me
> > if I
> > > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > > depending on all the input streams. If a tuple is considered late for
> > one
> > > > input stream, it should also be considered late for the whole join
> > > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > > selects
> > > > the watermark with the smallest timestamp from all the latest
> > watermarks
> > > > coming from upstreams as its current watermark, so that it can make
> > sure
> > > > that it's always keeping the strictest watermark to eliminate late
> > > tuples.
> > > >
> > > > Shunxin
> > > >
> > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan 
> > > wrote:
> > > >
> > > > > I think in theory, the watermark should be sent by the input
> operator
> > > > since
> > > > > the input should have the knowledge of the criteria of lateness
> since
> > > it
> > > > > can depend on many factors like the time of the day, the source of
> > the
> > > > data
> > > > > (e.g. mobile data), that the WindowedOperator should in general
> make
> > no
> > > > > assumption about.
> > > > >
> > > > > However, I think it's possible to implement some kind of watermark
> > > > > generation in the WindowedOperator itself if that knowledge is not
> > > > > available from the input. It's actually already doing that if you
> > call
> > > > > the setFixedWatermark
> > > > > method, which will generate a watermark tuple, with a timestamp
> that
> > is
> > > > > based on the derived time from the streaming window id, downstream
> > for
> > > > each
> > > > > streaming window. It's possible to add the support of heuristic
> > > watermark
> > > > > generation as well and you're welcome to take that up.
> > > > >
> > > > > For the Windowed Join operator, the watermark generated for
> > downstream
> > > > > depends on the watermark arriving from each input stream, and it's
> > not
> > > > just
> > > > > a simple propagate. Shunxin can comment more on this.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > > chin...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I was looking at Windowed Operator APIs and have to mention
> they're
> > > > > pretty
> > > > > > nicely done.
> > > > > >
> > > > > > I have a question related to watermark generation.
> > > > > >
> > > > > > What I understood is that for completion of processing of an
> event
> > > > window
> > > > > > one has provision for sending of watermark tuple from some
> previous
> > > > stage
> > > > > > in the DAG. I want to know who should be doing that and when
> should
> > > be
> > > > it
> > > > > > done.
> > > > > >
> > > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I
> > > would
> > > > > > like to use it in my application. Can someone give me an example
> of
> > > > how a
> > > > > > DAG will look like with this operator which has a stage which
> > > generates
> > > > > > watermark? And how should that stage decide on when to generate a
> > > > > watermark
> > > > > >

Re: Watermark generation in Windowed Operators

2016-09-16 Thread David Yan
Hi Shunxin,

If the watermark code in your PR is not behaving the way it should, please
do change it. Thanks!

David

On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu  wrote:

> Hi David,
>
> Thanks for the clarification. Should we update the watermark for join
> operator when there's a watermark arrived from one of the input streams
> even if the watermark from another input stream is not arrived yet?
>
> Shunxin
>
> On Fri, Sep 16, 2016 at 10:59 AM, David Yan  wrote:
>
> > Actually, that's not entirely true. Here are the points about the
> watermark
> > tuple generation of the join operator:
> >
> > 1) We keep the timestamp of the latest watermark for each input port
> >
> > 2) We keep another timestamp that is equal to minimum of all the
> timestamps
> > mentioned in (1).
> >
> > 3) Upon arrival of a watermark from an input port, we update the
> timestamp
> > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > generate the watermark tuple with the timestamp that is equal to the new
> > value of (2).
> >
> > 4) That means initially, the watermark is only generated when we have
> seen
> > a watermark for all input ports. And the fact that we take the smallest
> > timestamp in (2) means we only consider a window as late only if all
> input
> > streams say that particular window is late.
> >
> > David
> >
> >
> > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu 
> wrote:
> >
> > > Hi Chinmay,
> > >
> > > Base on the discussion I had with David, and David please correct me
> if I
> > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > depending on all the input streams. If a tuple is considered late for
> one
> > > input stream, it should also be considered late for the whole join
> > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > selects
> > > the watermark with the smallest timestamp from all the latest
> watermarks
> > > coming from upstreams as its current watermark, so that it can make
> sure
> > > that it's always keeping the strictest watermark to eliminate late
> > tuples.
> > >
> > > Shunxin
> > >
> > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan 
> > wrote:
> > >
> > > > I think in theory, the watermark should be sent by the input operator
> > > since
> > > > the input should have the knowledge of the criteria of lateness since
> > it
> > > > can depend on many factors like the time of the day, the source of
> the
> > > data
> > > > (e.g. mobile data), that the WindowedOperator should in general make
> no
> > > > assumption about.
> > > >
> > > > However, I think it's possible to implement some kind of watermark
> > > > generation in the WindowedOperator itself if that knowledge is not
> > > > available from the input. It's actually already doing that if you
> call
> > > > the setFixedWatermark
> > > > method, which will generate a watermark tuple, with a timestamp that
> is
> > > > based on the derived time from the streaming window id, downstream
> for
> > > each
> > > > streaming window. It's possible to add the support of heuristic
> > watermark
> > > > generation as well and you're welcome to take that up.
> > > >
> > > > For the Windowed Join operator, the watermark generated for
> downstream
> > > > depends on the watermark arriving from each input stream, and it's
> not
> > > just
> > > > a simple propagate. Shunxin can comment more on this.
> > > >
> > > > David
> > > >
> > > >
> > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > chin...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I was looking at Windowed Operator APIs and have to mention they're
> > > > pretty
> > > > > nicely done.
> > > > >
> > > > > I have a question related to watermark generation.
> > > > >
> > > > > What I understood is that for completion of processing of an event
> > > window
> > > > > one has provision for sending of watermark tuple from some previous
> > > stage
> > > > > in the DAG. I want to know who should be doing that and when should
> > be
> > > it
> > > > > done.
> > > > >
> > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I
> > would
> > > > > like to use it in my application. Can someone give me an example of
> > > how a
> > > > > DAG will look like with this operator which has a stage which
> > generates
> > > > > watermark? And how should that stage decide on when to generate a
> > > > watermark
> > > > > tuple?
> > > > >
> > > > > -Chinmay.
> > > > >
> > > >
> > >
> >
>


Re: Watermark generation in Windowed Operators

2016-09-16 Thread Shunxin Lu
Hi David,

Thanks for the clarification. Should we update the watermark for join
operator when there's a watermark arrived from one of the input streams
even if the watermark from another input stream is not arrived yet?

Shunxin

On Fri, Sep 16, 2016 at 10:59 AM, David Yan  wrote:

> Actually, that's not entirely true. Here are the points about the watermark
> tuple generation of the join operator:
>
> 1) We keep the timestamp of the latest watermark for each input port
>
> 2) We keep another timestamp that is equal to minimum of all the timestamps
> mentioned in (1).
>
> 3) Upon arrival of a watermark from an input port, we update the timestamp
> mentioned in (1), and evaluate (2). If the value of (2) changes, we
> generate the watermark tuple with the timestamp that is equal to the new
> value of (2).
>
> 4) That means initially, the watermark is only generated when we have seen
> a watermark for all input ports. And the fact that we take the smallest
> timestamp in (2) means we only consider a window as late only if all input
> streams say that particular window is late.
>
> David
>
>
> On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu  wrote:
>
> > Hi Chinmay,
> >
> > Base on the discussion I had with David, and David please correct me if I
> > am wrong, the watermark for Windowed Join Operator should be indeed
> > depending on all the input streams. If a tuple is considered late for one
> > input stream, it should also be considered late for the whole join
> > operator. That's why in the AbstractWindowedJoinOperator, it always
> selects
> > the watermark with the smallest timestamp from all the latest watermarks
> > coming from upstreams as its current watermark, so that it can make sure
> > that it's always keeping the strictest watermark to eliminate late
> tuples.
> >
> > Shunxin
> >
> > On Fri, Sep 16, 2016 at 10:02 AM, David Yan 
> wrote:
> >
> > > I think in theory, the watermark should be sent by the input operator
> > since
> > > the input should have the knowledge of the criteria of lateness since
> it
> > > can depend on many factors like the time of the day, the source of the
> > data
> > > (e.g. mobile data), that the WindowedOperator should in general make no
> > > assumption about.
> > >
> > > However, I think it's possible to implement some kind of watermark
> > > generation in the WindowedOperator itself if that knowledge is not
> > > available from the input. It's actually already doing that if you call
> > > the setFixedWatermark
> > > method, which will generate a watermark tuple, with a timestamp that is
> > > based on the derived time from the streaming window id, downstream for
> > each
> > > streaming window. It's possible to add the support of heuristic
> watermark
> > > generation as well and you're welcome to take that up.
> > >
> > > For the Windowed Join operator, the watermark generated for downstream
> > > depends on the watermark arriving from each input stream, and it's not
> > just
> > > a simple propagate. Shunxin can comment more on this.
> > >
> > > David
> > >
> > >
> > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> chin...@apache.org>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I was looking at Windowed Operator APIs and have to mention they're
> > > pretty
> > > > nicely done.
> > > >
> > > > I have a question related to watermark generation.
> > > >
> > > > What I understood is that for completion of processing of an event
> > window
> > > > one has provision for sending of watermark tuple from some previous
> > stage
> > > > in the DAG. I want to know who should be doing that and when should
> be
> > it
> > > > done.
> > > >
> > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I
> would
> > > > like to use it in my application. Can someone give me an example of
> > how a
> > > > DAG will look like with this operator which has a stage which
> generates
> > > > watermark? And how should that stage decide on when to generate a
> > > watermark
> > > > tuple?
> > > >
> > > > -Chinmay.
> > > >
> > >
> >
>


Re: Watermark generation in Windowed Operators

2016-09-16 Thread David Yan
Actually, that's not entirely true. Here are the points about the watermark
tuple generation of the join operator:

1) We keep the timestamp of the latest watermark for each input port

2) We keep another timestamp that is equal to minimum of all the timestamps
mentioned in (1).

3) Upon arrival of a watermark from an input port, we update the timestamp
mentioned in (1), and evaluate (2). If the value of (2) changes, we
generate the watermark tuple with the timestamp that is equal to the new
value of (2).

4) That means initially, the watermark is only generated when we have seen
a watermark for all input ports. And the fact that we take the smallest
timestamp in (2) means we only consider a window as late only if all input
streams say that particular window is late.

David


On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu  wrote:

> Hi Chinmay,
>
> Base on the discussion I had with David, and David please correct me if I
> am wrong, the watermark for Windowed Join Operator should be indeed
> depending on all the input streams. If a tuple is considered late for one
> input stream, it should also be considered late for the whole join
> operator. That's why in the AbstractWindowedJoinOperator, it always selects
> the watermark with the smallest timestamp from all the latest watermarks
> coming from upstreams as its current watermark, so that it can make sure
> that it's always keeping the strictest watermark to eliminate late tuples.
>
> Shunxin
>
> On Fri, Sep 16, 2016 at 10:02 AM, David Yan  wrote:
>
> > I think in theory, the watermark should be sent by the input operator
> since
> > the input should have the knowledge of the criteria of lateness since it
> > can depend on many factors like the time of the day, the source of the
> data
> > (e.g. mobile data), that the WindowedOperator should in general make no
> > assumption about.
> >
> > However, I think it's possible to implement some kind of watermark
> > generation in the WindowedOperator itself if that knowledge is not
> > available from the input. It's actually already doing that if you call
> > the setFixedWatermark
> > method, which will generate a watermark tuple, with a timestamp that is
> > based on the derived time from the streaming window id, downstream for
> each
> > streaming window. It's possible to add the support of heuristic watermark
> > generation as well and you're welcome to take that up.
> >
> > For the Windowed Join operator, the watermark generated for downstream
> > depends on the watermark arriving from each input stream, and it's not
> just
> > a simple propagate. Shunxin can comment more on this.
> >
> > David
> >
> >
> > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar 
> > wrote:
> >
> > > Hi All,
> > >
> > > I was looking at Windowed Operator APIs and have to mention they're
> > pretty
> > > nicely done.
> > >
> > > I have a question related to watermark generation.
> > >
> > > What I understood is that for completion of processing of an event
> window
> > > one has provision for sending of watermark tuple from some previous
> stage
> > > in the DAG. I want to know who should be doing that and when should be
> it
> > > done.
> > >
> > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I would
> > > like to use it in my application. Can someone give me an example of
> how a
> > > DAG will look like with this operator which has a stage which generates
> > > watermark? And how should that stage decide on when to generate a
> > watermark
> > > tuple?
> > >
> > > -Chinmay.
> > >
> >
>


Re: Watermark generation in Windowed Operators

2016-09-16 Thread Shunxin Lu
Hi Chinmay,

Base on the discussion I had with David, and David please correct me if I
am wrong, the watermark for Windowed Join Operator should be indeed
depending on all the input streams. If a tuple is considered late for one
input stream, it should also be considered late for the whole join
operator. That's why in the AbstractWindowedJoinOperator, it always selects
the watermark with the smallest timestamp from all the latest watermarks
coming from upstreams as its current watermark, so that it can make sure
that it's always keeping the strictest watermark to eliminate late tuples.

Shunxin

On Fri, Sep 16, 2016 at 10:02 AM, David Yan  wrote:

> I think in theory, the watermark should be sent by the input operator since
> the input should have the knowledge of the criteria of lateness since it
> can depend on many factors like the time of the day, the source of the data
> (e.g. mobile data), that the WindowedOperator should in general make no
> assumption about.
>
> However, I think it's possible to implement some kind of watermark
> generation in the WindowedOperator itself if that knowledge is not
> available from the input. It's actually already doing that if you call
> the setFixedWatermark
> method, which will generate a watermark tuple, with a timestamp that is
> based on the derived time from the streaming window id, downstream for each
> streaming window. It's possible to add the support of heuristic watermark
> generation as well and you're welcome to take that up.
>
> For the Windowed Join operator, the watermark generated for downstream
> depends on the watermark arriving from each input stream, and it's not just
> a simple propagate. Shunxin can comment more on this.
>
> David
>
>
> On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar 
> wrote:
>
> > Hi All,
> >
> > I was looking at Windowed Operator APIs and have to mention they're
> pretty
> > nicely done.
> >
> > I have a question related to watermark generation.
> >
> > What I understood is that for completion of processing of an event window
> > one has provision for sending of watermark tuple from some previous stage
> > in the DAG. I want to know who should be doing that and when should be it
> > done.
> >
> > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I would
> > like to use it in my application. Can someone give me an example of how a
> > DAG will look like with this operator which has a stage which generates
> > watermark? And how should that stage decide on when to generate a
> watermark
> > tuple?
> >
> > -Chinmay.
> >
>


Re: Watermark generation in Windowed Operators

2016-09-16 Thread David Yan
I think in theory, the watermark should be sent by the input operator since
the input should have the knowledge of the criteria of lateness since it
can depend on many factors like the time of the day, the source of the data
(e.g. mobile data), that the WindowedOperator should in general make no
assumption about.

However, I think it's possible to implement some kind of watermark
generation in the WindowedOperator itself if that knowledge is not
available from the input. It's actually already doing that if you call
the setFixedWatermark
method, which will generate a watermark tuple, with a timestamp that is
based on the derived time from the streaming window id, downstream for each
streaming window. It's possible to add the support of heuristic watermark
generation as well and you're welcome to take that up.

For the Windowed Join operator, the watermark generated for downstream
depends on the watermark arriving from each input stream, and it's not just
a simple propagate. Shunxin can comment more on this.

David


On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar 
wrote:

> Hi All,
>
> I was looking at Windowed Operator APIs and have to mention they're pretty
> nicely done.
>
> I have a question related to watermark generation.
>
> What I understood is that for completion of processing of an event window
> one has provision for sending of watermark tuple from some previous stage
> in the DAG. I want to know who should be doing that and when should be it
> done.
>
> For e.g. I saw a PR of Windows Join Operator in apex-malhar and I would
> like to use it in my application. Can someone give me an example of how a
> DAG will look like with this operator which has a stage which generates
> watermark? And how should that stage decide on when to generate a watermark
> tuple?
>
> -Chinmay.
>