Re: At what point do watermarks get injected into the stream?

2017-06-12 Thread Fabian Hueske
Hi,

each operator keeps track of the latest (and therefore maximum) watermark
received from each of its inputs and sets its own internal time to the
minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1)
first receives a WM 33 from Map(1) it won't emit a watermark with 33 until
it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not
matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,
Fabian

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>:

> Thanks!
>
>
>
> I had a couple some follow-up questions to the example in the
> documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends
> a watermark of 17. If I understand correctly, map(1) will forward the
> watermark of 33 to window(1) and window(2), and map(2) will forward the
> watermark of 17 to the same window operators. I’m assuming there is nothing
> to prevent window(1) and window(2) from getting the watermark of 33 before
> the watermark of 17, right? In that case, how do window(1) and window(2)
> compute the minimum watermark to forward to the next operator downstream?
> Will it be a per-window watermark?
>
>
>
> What would happen if instead if a window operator in that position, we had
> something like the CEP operator, which in effect maintains state and does
> aggregations without windowing (or another similar such operator)? How does
> it determine what the minimum watermark is at any given time, in light of
> the fact that, in principle, it might receive a watermark value smaller
> than anything it’s seen before from a parallel source?
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske <fhue...@gmail.com>
> *Date: *Sunday, June 11, 2017 at 5:54 PM
>
> *To: *Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Each parallel instance of a TimestampAssigner independently assigns
> timestamps.
>
> After a shuffle, operators forward the minimum watermark across all input
> connections. For details have a look at the watermarks documentation [1].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/event_time.html#watermarks-in-parallel-streams
>
>
>
> 2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>:
>
> Thanks for the explanation, Fabian.
>
>
>
> Suppose I have a parallel source that does not inject watermarks, and the
> first operation on the DataStream is assignTimestampsAndWatermarks. Does
> each parallel task that makes up the source independently inject watermarks
> for the records that it has read? Suppose I then call keyBy and a shuffle
> ensues. Will the resulting partitions after the shuffle have interleaved
> watermarks from the various source tasks?
>
>
>
> More concretely, suppose s source has a degree of parallelism of two. One
> of the source tasks injects the watermarks 2 and 5, while the other injects
> 3 and 10. There is then a shuffle, creating two different partitions. Will
> all the watermarks be broadcast to all the partitions? Or is it possible
> for, say, one partition to end up with watermarks 2 and 10 and another with
> 3 and 5? And after the shuffle, how do we ensure that the watermarks are
> processed in order by the operators receiving them?
>
>
>
> Thanks,
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske <fhue...@gmail.com>
> *Date: *Saturday, June 10, 2017 at 3:56 PM
> *To: *Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Hi Ray,
>
> in principle, watermarks can be injected anywhere in a stream by calling
> DataStream.assignTimestampsAndWatermarks().
>
> However, timestamps are usually injected as soon as possible after a
> stream in ingested (before the first shuffle). The reason is that
> watermarks depend on the order of events (and their timestamps) in the
> stream. While Flink guarantees the order of events within a partition, a
> shuffle interleaves events of different partitions in an unpredictable way
> such that it is not possible to reason about the order of timestamps
> afterwards.
>
> The most common way to inject watermarks is directly inside of a
> SourceFunction or with a TimestampAssigner before the first shuffle.
>
> Best, Fabian
>
>
>
> 2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <ray.

Re: At what point do watermarks get injected into the stream?

2017-06-12 Thread Ray Ruvinskiy
Thanks!

I had a couple some follow-up questions to the example in the documentation. 
Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. 
If I understand correctly, map(1) will forward the watermark of 33 to window(1) 
and window(2), and map(2) will forward the watermark of 17 to the same window 
operators. I’m assuming there is nothing to prevent window(1) and window(2) 
from getting the watermark of 33 before the watermark of 17, right? In that 
case, how do window(1) and window(2) compute the minimum watermark to forward 
to the next operator downstream? Will it be a per-window watermark?

What would happen if instead if a window operator in that position, we had 
something like the CEP operator, which in effect maintains state and does 
aggregations without windowing (or another similar such operator)? How does it 
determine what the minimum watermark is at any given time, in light of the fact 
that, in principle, it might receive a watermark value smaller than anything 
it’s seen before from a parallel source?

Ray

From: Fabian Hueske <fhue...@gmail.com>
Date: Sunday, June 11, 2017 at 5:54 PM
To: Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: At what point do watermarks get injected into the stream?

Each parallel instance of a TimestampAssigner independently assigns timestamps.
After a shuffle, operators forward the minimum watermark across all input 
connections. For details have a look at the watermarks documentation [1].
Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy 
<ray.ruvins...@arcticwolf.com<mailto:ray.ruvins...@arcticwolf.com>>:
Thanks for the explanation, Fabian.

Suppose I have a parallel source that does not inject watermarks, and the first 
operation on the DataStream is assignTimestampsAndWatermarks. Does each 
parallel task that makes up the source independently inject watermarks for the 
records that it has read? Suppose I then call keyBy and a shuffle ensues. Will 
the resulting partitions after the shuffle have interleaved watermarks from the 
various source tasks?

More concretely, suppose s source has a degree of parallelism of two. One of 
the source tasks injects the watermarks 2 and 5, while the other injects 3 and 
10. There is then a shuffle, creating two different partitions. Will all the 
watermarks be broadcast to all the partitions? Or is it possible for, say, one 
partition to end up with watermarks 2 and 10 and another with 3 and 5? And 
after the shuffle, how do we ensure that the watermarks are processed in order 
by the operators receiving them?

Thanks,

Ray

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy 
<ray.ruvins...@arcticwolf.com<mailto:ray.ruvins...@arcticwolf.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: At what point do watermarks get injected into the stream?

Hi Ray,
in principle, watermarks can be injected anywhere in a stream by calling 
DataStream.assignTimestampsAndWatermarks().
However, timestamps are usually injected as soon as possible after a stream in 
ingested (before the first shuffle). The reason is that watermarks depend on 
the order of events (and their timestamps) in the stream. While Flink 
guarantees the order of events within a partition, a shuffle interleaves events 
of different partitions in an unpredictable way such that it is not possible to 
reason about the order of timestamps afterwards.
The most common way to inject watermarks is directly inside of a SourceFunction 
or with a TimestampAssigner before the first shuffle.
Best, Fabian

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy 
<ray.ruvins...@arcticwolf.com<mailto:ray.ruvins...@arcticwolf.com>>:
I’m trying to build a mental model of how watermarks get injected into the 
stream. Suppose I have a stream with a parallel source, and I’m running a 
cluster with multiple task managers. Does each parallel source reader inject 
watermarks, which are then forwarded to downstream consumers and shuffled 
between task managers? Or are watermarks created after the shuffle, when the 
stream records reach their destined task manager and right before they’re 
processed by the operator?

Thanks,

Ray




Re: At what point do watermarks get injected into the stream?

2017-06-11 Thread Ray Ruvinskiy
Thanks for the explanation, Fabian.

Suppose I have a parallel source that does not inject watermarks, and the first 
operation on the DataStream is assignTimestampsAndWatermarks. Does each 
parallel task that makes up the source independently inject watermarks for the 
records that it has read? Suppose I then call keyBy and a shuffle ensues. Will 
the resulting partitions after the shuffle have interleaved watermarks from the 
various source tasks?

More concretely, suppose s source has a degree of parallelism of two. One of 
the source tasks injects the watermarks 2 and 5, while the other injects 3 and 
10. There is then a shuffle, creating two different partitions. Will all the 
watermarks be broadcast to all the partitions? Or is it possible for, say, one 
partition to end up with watermarks 2 and 10 and another with 3 and 5? And 
after the shuffle, how do we ensure that the watermarks are processed in order 
by the operators receiving them?

Thanks,

Ray

From: Fabian Hueske <fhue...@gmail.com>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: At what point do watermarks get injected into the stream?

Hi Ray,
in principle, watermarks can be injected anywhere in a stream by calling 
DataStream.assignTimestampsAndWatermarks().
However, timestamps are usually injected as soon as possible after a stream in 
ingested (before the first shuffle). The reason is that watermarks depend on 
the order of events (and their timestamps) in the stream. While Flink 
guarantees the order of events within a partition, a shuffle interleaves events 
of different partitions in an unpredictable way such that it is not possible to 
reason about the order of timestamps afterwards.
The most common way to inject watermarks is directly inside of a SourceFunction 
or with a TimestampAssigner before the first shuffle.
Best, Fabian

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy 
<ray.ruvins...@arcticwolf.com<mailto:ray.ruvins...@arcticwolf.com>>:
I’m trying to build a mental model of how watermarks get injected into the 
stream. Suppose I have a stream with a parallel source, and I’m running a 
cluster with multiple task managers. Does each parallel source reader inject 
watermarks, which are then forwarded to downstream consumers and shuffled 
between task managers? Or are watermarks created after the shuffle, when the 
stream records reach their destined task manager and right before they’re 
processed by the operator?

Thanks,

Ray