Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-12 Thread Sameer W
Thanks Max -

I will advance watermarks when no event arrives for a while.  But when
using Kafka is it a good practice to assign events to partitions randomly
instead say device id or region id where the devices are located. What I
noticed is if devices sending to one of the partitions stop sending
information, the pipeline completely freezes unless I manually keep moving
the watermark.

But the problem with sending events to random partitions is that when the
devices come back online, they send events which are now registered as late
events and the windows fire one element at a time.

Thanks,
Sameer

On Fri, Aug 12, 2016 at 4:41 AM, Maximilian Michels  wrote:

> Hi Sameer,
>
> If you use Event Time you should make sure to assign Watermarks and
> Timestamps at the source. As you already observed, Flink may get stuck
> otherwise because it waits for Watermarks to progress in time.
>
> There is no timeout for windows. However, you can implement that logic
> in your Watermark generation function.
>
> You're already using
> DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
> assigner)
>
> Your assigner has a `getCurrentWatermark()` method. This is called
> every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
> set this via ExecutionConfig#setAutoWatermarkInterval(long
> milliseconds).
>
> In your assigner, simply create a field to keep track of the last time
> you emitted a Watermark. If you haven't emitted a Watermark for some
> time, you can kick off a timeout and emit a Watermark.
>
> Cheers,
> Max
>
> On Thu, Aug 11, 2016 at 1:05 AM, Sameer W  wrote:
> > Sorry for replying to my own messages but this is super confusing and
> > logical at the same time to me :-).
> >
> > If I have Kafka Topic with 10 partitions. If I partition by device id
> when I
> > write to the Topic, and use Event Time, my pipeline freezes (if fewer
> than
> > 10 devices are active initially). Because if some partitions are inactive
> > (only a few devices active at a time) they do not send watermarks and my
> > pipeline waits forever for those partitions to send in their watermarks
> even
> > if the keyBy is on the device id whose records are going to come from
> only
> > one partition.
> >
> > When I send records to Kafka randomly (to any partition) the pipeline
> works
> > fine as all partitions (sources connected to them) are sending
> watermarks.
> >
> > This gets even more confusing if I apply watermarks and timestamps
> > downstream after a KeyBy operation which is again followed by another
> keyBy
> > which does not receive events for a key from all the upstream operators.
> > Again nothing fires as Flink expects other map operators (to which the
> > watermark assignment is piped) to send in the watermarks as well.
> >
> > My conclusion: Only produce watermarks at the source function. Is this
> valid
> > or am I missing something? Because only when I do that (and random
> > allocation of events to partitions in Kafka) the whole pipeline works
> > reliably.
> >
> > If there a way to set a timeout - If watermarks from source functions are
> > not received within a certain time interval, fire the time windows.
> >
> > Thanks,
> > Sameer
> >
> >
> >
> >
> > On Wed, Aug 10, 2016 at 3:27 PM, Sameer W  wrote:
> >>
> >> And this is happening in my local environment. As soon as I set the
> >> parallelism to 1 it all works fine.
> >>
> >> Sameer
> >>
> >> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W  wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am noticing this behavior with Event Time processing-
> >>>
> >>> I have a Kafka topic with 10 partitions. Each Event Source sends data
> to
> >>> any one of the partitions. Say I have only 1 event source active at
> this
> >>> moment, which means only one partition is receiving data.
> >>>
> >>> None of my windows will fire now because the 9 partitions (source
> >>> function instances) are not sending any watermarks and Flink waits
> forever.
> >>>
> >>> I go to topic with 1 partition but leave default parallelism intact.
> Only
> >>> one Mapper instance contributes to the subsequent keyBy operation but
> other
> >>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks
> after
> >>> the map function. Again the same behavior because the 7 other mappers
> are
> >>> not sending watermarks.
> >>>
> >>> How do I handle this? Not all of my partitions are going to be
> receiving
> >>> data at all times using this partitioning strategy. Or I have to use
> random
> >>> partitioning which will also work.
> >>>
> >>> Thanks,
> >>> Sameer
> >>
> >>
> >
>


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-12 Thread Maximilian Michels
Hi Sameer,

If you use Event Time you should make sure to assign Watermarks and
Timestamps at the source. As you already observed, Flink may get stuck
otherwise because it waits for Watermarks to progress in time.

There is no timeout for windows. However, you can implement that logic
in your Watermark generation function.

You're already using
DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
assigner)

Your assigner has a `getCurrentWatermark()` method. This is called
every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
set this via ExecutionConfig#setAutoWatermarkInterval(long
milliseconds).

In your assigner, simply create a field to keep track of the last time
you emitted a Watermark. If you haven't emitted a Watermark for some
time, you can kick off a timeout and emit a Watermark.

Cheers,
Max

On Thu, Aug 11, 2016 at 1:05 AM, Sameer W  wrote:
> Sorry for replying to my own messages but this is super confusing and
> logical at the same time to me :-).
>
> If I have Kafka Topic with 10 partitions. If I partition by device id when I
> write to the Topic, and use Event Time, my pipeline freezes (if fewer than
> 10 devices are active initially). Because if some partitions are inactive
> (only a few devices active at a time) they do not send watermarks and my
> pipeline waits forever for those partitions to send in their watermarks even
> if the keyBy is on the device id whose records are going to come from only
> one partition.
>
> When I send records to Kafka randomly (to any partition) the pipeline works
> fine as all partitions (sources connected to them) are sending watermarks.
>
> This gets even more confusing if I apply watermarks and timestamps
> downstream after a KeyBy operation which is again followed by another keyBy
> which does not receive events for a key from all the upstream operators.
> Again nothing fires as Flink expects other map operators (to which the
> watermark assignment is piped) to send in the watermarks as well.
>
> My conclusion: Only produce watermarks at the source function. Is this valid
> or am I missing something? Because only when I do that (and random
> allocation of events to partitions in Kafka) the whole pipeline works
> reliably.
>
> If there a way to set a timeout - If watermarks from source functions are
> not received within a certain time interval, fire the time windows.
>
> Thanks,
> Sameer
>
>
>
>
> On Wed, Aug 10, 2016 at 3:27 PM, Sameer W  wrote:
>>
>> And this is happening in my local environment. As soon as I set the
>> parallelism to 1 it all works fine.
>>
>> Sameer
>>
>> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W  wrote:
>>>
>>> Hi,
>>>
>>> I am noticing this behavior with Event Time processing-
>>>
>>> I have a Kafka topic with 10 partitions. Each Event Source sends data to
>>> any one of the partitions. Say I have only 1 event source active at this
>>> moment, which means only one partition is receiving data.
>>>
>>> None of my windows will fire now because the 9 partitions (source
>>> function instances) are not sending any watermarks and Flink waits forever.
>>>
>>> I go to topic with 1 partition but leave default parallelism intact. Only
>>> one Mapper instance contributes to the subsequent keyBy operation but other
>>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
>>> the map function. Again the same behavior because the 7 other mappers are
>>> not sending watermarks.
>>>
>>> How do I handle this? Not all of my partitions are going to be receiving
>>> data at all times using this partitioning strategy. Or I have to use random
>>> partitioning which will also work.
>>>
>>> Thanks,
>>> Sameer
>>
>>
>


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
Sorry for replying to my own messages but this is super confusing and
logical at the same time to me :-).

If I have Kafka Topic with 10 partitions. If I partition by device id when
I write to the Topic, and use Event Time, my pipeline freezes (if fewer
than 10 devices are active initially). Because if some partitions are
inactive (only a few devices active at a time) they do not send watermarks
and my pipeline waits forever for those partitions to send in their
watermarks even if the keyBy is on the device id whose records are going to
come from only one partition.

When I send records to Kafka randomly (to any partition) the pipeline works
fine as all partitions (sources connected to them) are sending watermarks.

This gets even more confusing if I apply watermarks and timestamps
downstream after a KeyBy operation which is again followed by another keyBy
which does not receive events for a key from all the upstream operators.
Again nothing fires as Flink expects other map operators (to which the
watermark assignment is piped) to send in the watermarks as well.

My conclusion: Only produce watermarks at the source function. Is this
valid or am I missing something? Because only when I do that (and random
allocation of events to partitions in Kafka) the whole pipeline works
reliably.

*If there a way to set a timeout - If watermarks from source functions are
not received within a certain time interval, fire the time windows.*

Thanks,
Sameer




On Wed, Aug 10, 2016 at 3:27 PM, Sameer W  wrote:

> And this is happening in my local environment. As soon as I set the
> parallelism to 1 it all works fine.
>
> Sameer
>
> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W  wrote:
>
>> Hi,
>>
>> I am noticing this behavior with Event Time processing-
>>
>> I have a Kafka topic with 10 partitions. Each Event Source sends data to
>> any one of the partitions. Say I have only 1 event source active at this
>> moment, which means only one partition is receiving data.
>>
>> None of my windows will fire now because the 9 partitions (source
>> function instances) are not sending any watermarks and Flink waits forever.
>>
>> I go to topic with 1 partition but leave default parallelism intact. Only
>> one Mapper instance contributes to the subsequent keyBy operation but other
>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
>> the map function. Again the same behavior because the 7 other mappers are
>> not sending watermarks.
>>
>> How do I handle this? Not all of my partitions are going to be receiving
>> data at all times using this partitioning strategy. Or I have to use random
>> partitioning which will also work.
>>
>> Thanks,
>> Sameer
>>
>
>


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
And this is happening in my local environment. As soon as I set the
parallelism to 1 it all works fine.

Sameer

On Wed, Aug 10, 2016 at 3:11 PM, Sameer W  wrote:

> Hi,
>
> I am noticing this behavior with Event Time processing-
>
> I have a Kafka topic with 10 partitions. Each Event Source sends data to
> any one of the partitions. Say I have only 1 event source active at this
> moment, which means only one partition is receiving data.
>
> None of my windows will fire now because the 9 partitions (source function
> instances) are not sending any watermarks and Flink waits forever.
>
> I go to topic with 1 partition but leave default parallelism intact. Only
> one Mapper instance contributes to the subsequent keyBy operation but other
> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
> the map function. Again the same behavior because the 7 other mappers are
> not sending watermarks.
>
> How do I handle this? Not all of my partitions are going to be receiving
> data at all times using this partitioning strategy. Or I have to use random
> partitioning which will also work.
>
> Thanks,
> Sameer
>


Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
Hi,

I am noticing this behavior with Event Time processing-

I have a Kafka topic with 10 partitions. Each Event Source sends data to
any one of the partitions. Say I have only 1 event source active at this
moment, which means only one partition is receiving data.

None of my windows will fire now because the 9 partitions (source function
instances) are not sending any watermarks and Flink waits forever.

I go to topic with 1 partition but leave default parallelism intact. Only
one Mapper instance contributes to the subsequent keyBy operation but other
7 (assuming 8 of default parallelism) are idle. I assign watermarks after
the map function. Again the same behavior because the 7 other mappers are
not sending watermarks.

How do I handle this? Not all of my partitions are going to be receiving
data at all times using this partitioning strategy. Or I have to use random
partitioning which will also work.

Thanks,
Sameer