Re: WaterMark & Eventwindow not fired correctly

2017-08-09 Thread Aljoscha Krettek
Hi,

So when the parallelism of the timestamp assigner is different from the 
parallelism of the map(KeyMapFunc()) or the window then it works? But when the 
parallelism is the same it does not work?

If this is true, then I would assume, that some parallel instances of the 
timestamp assigner don't get any events and therefore don't advance the 
watermark. This, in turn, would mean that the downstream watermark also doesn't 
advance. Could you check in the web interface if all parallel instances of the 
assigner are processing elements when you have the same parallelism for all 
operations?

Best,
Aljoscha

> On 9. Aug 2017, at 11:33, aitozi <gjying1...@gmail.com> wrote:
> 
> Hi, Bellow is my code 
> 
> splitStream.select(duringTime + "")
>.map(new KeyMapFunc())
>.assignTimestampsAndWatermarks(new DelaySaltWatermarks())
>.setParallelism(300)
>.keyBy(_SQL, _KEY, _SALT)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
>.apply(new WindowSaltFunc())
>.keyBy(_SQL, _KEY)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
>.apply(new WindowFunc())
>.addSink(new FlinkKafkaProducer010<>("topic", new
> SimpleSerializationSchema(), this.properties));
> 
> and 
> 
> public class DelaySaltWatermarks implements
> AssignerWithPeriodicWatermarks {
> 
>private long currentMaxTimestamp;
> 
>@Nullable
>@Override
>public Watermark getCurrentWatermark() {
>return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
>}
> 
>@Override
>public long extractTimestamp(ContentMessage contentMessage, long l) {
>long timestamp = contentMessage.getTimestamp();
>currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>return timestamp;
>}
> }
> 
> and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
> the window can be fired.
> 
> thanks,
> aitozi
> 
> 
> Aljoscha Krettek wrote
>> Hi,
>> 
>> So I understood that you have roughly this pipeline:
>> 
>> Input 1 --\
>>   |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window
>> Input 2 --/
>> 
>> If the timestamp assigner is after the CoFlatMap the processInput() method
>> of the extractor should still be called. Not by the StreamInputProcessor
>> but by ChainingOutput [1], which basically connects the Two-Input
>> CoFlatMap to the one-input operator that comes after that. The could still
>> be a bug in there somewhere, however.
>> 
>> Could you maybe send me the relevant parts of your code, so that I can
>> have a look. Or provide a minimal example.
>> 
>> Best,
>> Aljoscha
>> 
>> [1]
>> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
>> 
>>> On 7. Aug 2017, at 19:21, aitozi 
> 
>> gjying1314@
> 
>>  wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> my flink version is 1.2
>>> 
>>> i am work on this problem these days. Below is my found.
>>> 
>>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>>> the before operator, the before operator has two input(it is a
>>> "connected"
>>> Co-FlatMap operator with parallelism 240), it runs into that the
>>> watermark
>>> didn't update.
>>> 
>>> the i look into the source code, that the
>>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>>> has
>>> method with processElement1() and processElement2() method, but all of
>>> them
>>> do not run processElement in StreamInputProcessor to
>>> extractTimestamp(shown
>>> in TimestampsAndPeriodicWatermarksOperator)
>>> 
>>> so that, the timestamp is not update, and my waterMark is update just
>>> like
>>> the class BoundedOutOfOrdernessTimestampExtractor .
>>> 
>>> So, is it a bug that the timestamp is not update when deal with a two
>>> input
>>> stream.
>>> 
>>> Ps: my English is not very good , i dont know can you understand me :)
>>> 
>>> thanks,
>>> aitozi
>>> 
>>> 
>>> 
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com <http://nabble.com/>.



Re: WaterMark & Eventwindow not fired correctly

2017-08-09 Thread aitozi
Hi, Bellow is my code 

 splitStream.select(duringTime + "")
.map(new KeyMapFunc())
.assignTimestampsAndWatermarks(new DelaySaltWatermarks())
.setParallelism(300)
.keyBy(_SQL, _KEY, _SALT)
   
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
.apply(new WindowSaltFunc())
.keyBy(_SQL, _KEY)
   
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
.apply(new WindowFunc())
.addSink(new FlinkKafkaProducer010<>("topic", new
SimpleSerializationSchema(), this.properties));

and 

public class DelaySaltWatermarks implements
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
}

@Override
public long extractTimestamp(ContentMessage contentMessage, long l) {
long timestamp = contentMessage.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}

and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
the window can be fired.

thanks,
aitozi


Aljoscha Krettek wrote
> Hi,
> 
> So I understood that you have roughly this pipeline:
> 
> Input 1 --\
>|- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window
> Input 2 --/
> 
> If the timestamp assigner is after the CoFlatMap the processInput() method
> of the extractor should still be called. Not by the StreamInputProcessor
> but by ChainingOutput [1], which basically connects the Two-Input
> CoFlatMap to the one-input operator that comes after that. The could still
> be a bug in there somewhere, however.
> 
> Could you maybe send me the relevant parts of your code, so that I can
> have a look. Or provide a minimal example.
> 
> Best,
> Aljoscha
> 
> [1]
> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
> 
>> On 7. Aug 2017, at 19:21, aitozi 

> gjying1314@

>  wrote:
>> 
>> 
>> Hi,
>> 
>> my flink version is 1.2
>> 
>> i am work on this problem these days. Below is my found.
>> 
>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>> the before operator, the before operator has two input(it is a
>> "connected"
>> Co-FlatMap operator with parallelism 240), it runs into that the
>> watermark
>> didn't update.
>> 
>> the i look into the source code, that the
>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>> has
>> method with processElement1() and processElement2() method, but all of
>> them
>> do not run processElement in StreamInputProcessor to
>> extractTimestamp(shown
>> in TimestampsAndPeriodicWatermarksOperator)
>> 
>> so that, the timestamp is not update, and my waterMark is update just
>> like
>> the class BoundedOutOfOrdernessTimestampExtractor .
>> 
>> So, is it a bug that the timestamp is not update when deal with a two
>> input
>> stream.
>> 
>> Ps: my English is not very good , i dont know can you understand me :)
>> 
>> thanks,
>> aitozi
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: WaterMark & Eventwindow not fired correctly

2017-08-08 Thread Aljoscha Krettek
Hi,

So I understood that you have roughly this pipeline:

Input 1 --\
   |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window
Input 2 --/

If the timestamp assigner is after the CoFlatMap the processInput() method of 
the extractor should still be called. Not by the StreamInputProcessor but by 
ChainingOutput [1], which basically connects the Two-Input CoFlatMap to the 
one-input operator that comes after that. The could still be a bug in there 
somewhere, however.

Could you maybe send me the relevant parts of your code, so that I can have a 
look. Or provide a minimal example.

Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394

> On 7. Aug 2017, at 19:21, aitozi <gjying1...@gmail.com> wrote:
> 
> 
> Hi,
> 
> my flink version is 1.2
> 
> i am work on this problem these days. Below is my found.
> 
> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
> the before operator, the before operator has two input(it is a "connected"
> Co-FlatMap operator with parallelism 240), it runs into that the watermark
> didn't update.
> 
> the i look into the source code, that the
> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
> method with processElement1() and processElement2() method, but all of them
> do not run processElement in StreamInputProcessor to extractTimestamp(shown
> in TimestampsAndPeriodicWatermarksOperator)
> 
> so that, the timestamp is not update, and my waterMark is update just like
> the class BoundedOutOfOrdernessTimestampExtractor .
> 
> So, is it a bug that the timestamp is not update when deal with a two input
> stream.
> 
> Ps: my English is not very good , i dont know can you understand me :)
> 
> thanks,
> aitozi
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: WaterMark & Eventwindow not fired correctly

2017-08-07 Thread aitozi

Hi,

my flink version is 1.2

i am work on this problem these days. Below is my found.

when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
the before operator, the before operator has two input(it is a "connected"
Co-FlatMap operator with parallelism 240), it runs into that the watermark
didn't update.

the i look into the source code, that the
StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
method with processElement1() and processElement2() method, but all of them
do not run processElement in StreamInputProcessor to extractTimestamp(shown
in TimestampsAndPeriodicWatermarksOperator)

so that, the timestamp is not update, and my waterMark is update just like
the class BoundedOutOfOrdernessTimestampExtractor .

So, is it a bug that the timestamp is not update when deal with a two input
stream.

Ps: my English is not very good , i dont know can you understand me :)

thanks,
aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: WaterMark & Eventwindow not fired correctly

2017-08-04 Thread Aljoscha Krettek
Hi,

Could you please provide a snipped of code or some minimal example that would 
help us reproducing your problem?

Best,
Aljoscha

> On 3. Aug 2017, at 17:41, aitozi <gjying1...@gmail.com> wrote:
> 
> 
> Hi,
> 
> i have encounted a problem, i apply generate and assign watermark at the
> datastream, and then keyBy, and  EventTimewindow and  apply window Function.
> 
> in the log, i can see that watermark and the eventtime with the message are
> correct , and i think the situation bellow will trigger the window function
> :
> 
> 1、watermark Time >= window_end_time
> 2、there is data in [window_start_time,window_end_time)
> 
> i check the log , it is satisfied . and i try to apply the
> trigger(CountTrigger.of(5)) Function and i can see in the log the
> windowapply Function is invocked.
> 
> And i am doubt why is the windowapply Function can not be triggerd only by
> the event time and watermark
> 
> thanks,
> aitozi
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



WaterMark & Eventwindow not fired correctly

2017-08-03 Thread aitozi

Hi,

i have encounted a problem, i apply generate and assign watermark at the
datastream, and then keyBy, and  EventTimewindow and  apply window Function.

in the log, i can see that watermark and the eventtime with the message are
correct , and i think the situation bellow will trigger the window function
:

1、watermark Time >= window_end_time
2、there is data in [window_start_time,window_end_time)

i check the log , it is satisfied . and i try to apply the
trigger(CountTrigger.of(5)) Function and i can see in the log the
windowapply Function is invocked.

And i am doubt why is the windowapply Function can not be triggerd only by
the event time and watermark

thanks,
aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.