[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196689#comment-17196689
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

Thanks for caring and looking at things! It's good to question things because 
there might be real errors.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196688#comment-17196688
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

Yes, that is correct.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196280#comment-17196280
 ] 

tinny cat commented on FLINK-19167:
---

thank you [~aljoscha] [~dwysakowicz] [~twalthr],  I tried the data of multiple 
keys, and the example works correctly! 

So different watermarks share the same timer。

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196180#comment-17196180
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

That's correct, I think. But in a real-world scenario that would likely happen.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196123#comment-17196123
 ] 

Dawid Wysakowicz commented on FLINK-19167:
--

I think the problem is that it is not just simplified, but that it imposes 
assumptions on how the watermarks are generated (which I think are actually 
incorrect). I think what [~tinny] is describing that the condition is never met 
might be actually true. 

Let's assume a perfect watermark. To be precise a watermark where we generate a 
Watermark with value of the timestamp of each incoming record. If I am not 
mistaken the contract is that the element will be emitted first and the 
Watermark second. In that scenario when we have records incoming with timestamp 
t = 1, 2, 61000. We should emit a result with count 2 at time t = 60002. 
However the record which would move the watermark past the 60002, will update 
the lastModified to 61000.

The example "works" only if the watermark progresses because of events from 
other keys. E.g. if you have values with just a single key, it won't work.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196112#comment-17196112
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

The example is in fact simplified and doesn't support multiple "windows", yes.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196029#comment-17196029
 ] 

Dawid Wysakowicz commented on FLINK-19167:
--

And the behaviour with that incoming data is correct. Both 7 and 60007 belong 
to the same "window". You want to emit results only if the gap between events 
is more than 6.

Now I do see a problem with that example though. It does not account for 
out-of-order elements in the {{processElement}} method or in other words it 
does not consider that there might be multiple open windows at a time. 
Nevertheless I still don't think your approach with using the 
{{currentWatermark}} is valid.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196015#comment-17196015
 ] 

tinny cat commented on FLINK-19167:
---

The following is my test output code:
{code:java}
@Override
  public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out) throws Exception {
Tuple1 key = (Tuple1) ctx.getCurrentKey();
CountWithTimestamp result = state.value();
System.out.println("timestamp: " + timestamp + ", " + "lastModified: " + 
result.lastModified);
if (timestamp == result.lastModified + 6) {
  out.collect(new Tuple2<>(key.f0, result.count));
}
  }
{code}
Here are the three pieces of data I sent:
 7, 8, 60007
 Then the output of flink is as follows:
{code:java}
timestamp: 60007, lastModified: 60007
{code}
[~dwysakowicz]


> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-15 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196002#comment-17196002
 ] 

Dawid Wysakowicz commented on FLINK-19167:
--

Why do you say the {{result.lastModified}} is equal to 60007? It should be 
{{7}}. 
The value of {{ctx.timestamp()}} in {{ProcessFunction#processElement}} is equal 
to the timestamp of the incoming record. Thus it is {{7}} in that case.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-14 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195831#comment-17195831
 ] 

tinny cat commented on FLINK-19167:
---

the timer method did fired, but this if statement will not hold:

{code:java}
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
  out.collect(new Tuple2(result.key, result.count));
 }
{code}
for example : 
There are 5 events incoming with timestamps t = 1, 2, 4, 5, 7.  We want to emit 
results when the event time reaches 60007. Therefore we need to register a 
timer for 7 + 6, which is ctx.timestamp() + 6. when timestamp increases 
to 60007, the timer method will be fired, now the timestamp is 60007,and the 
value of result.lastModified is also 60007, therefore,result.lastModified + 
6000 is greater than timestamp。so `out.collect(new Tuple2(result.key, result.count));` will never happen.


> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-14 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195308#comment-17195308
 ] 

Timo Walther commented on FLINK-19167:
--

[~tinny] the timestamp of {{onTimer}} will be the timestamp with which you 
registered the timer. It will not be the watermark. So I also think that the 
example is correct. If {{current.lastModified + 6}} is already covered by 
the current watermark when calling {{registerEventTimeTimer}}, the timer method 
would fire immediately.

As Aljoscha said before, usually people don't need to think about watermarks in 
ProcessFunction because this is rather an internal concept. Users can simply 
work with event-time timestamps.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-14 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195306#comment-17195306
 ] 

Dawid Wysakowicz commented on FLINK-19167:
--

I second [~aljoscha] I think the example is correct.
What do you return in your {{AssignerWithPeriodicWatermarks#extractTimestamp}} 
? Do you return {{timestamp}} or {{currentMaxTimestamp}}? The correct behaviour 
would be to return the {{timestamp}}.

The code in the {{onTime}} should trigger. Consider this example. There are 5 
events incoming with timestamps t = 1, 2, 4, 8, 7. We want to emit results when 
the event time reaches 60007. Therefore we need to register a timer for 7 + 
6, which is {{ctx.timestamp() + 6}}. Let us know if it is still unclear.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-13 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195154#comment-17195154
 ] 

tinny cat commented on FLINK-19167:
---

The watermark did increase, and the timer did trigger. I mean there is a 
problem with the code logic in the onTimer() method ,this equation will never 
hold.
{code:java}
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
  out.collect(new Tuple2(result.key, result.count));
 }
{code}
the reason is the code is incorrect,
{code:java}
current.lastModified = ctx.timestamp();
{code}
it should be
{code:java}
current.lastModified = ctx.timerService().currentWatermark();
{code}

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-10 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193513#comment-17193513
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

This depends on what the rest of the system is doing. There must be more input 
records to make the watermark advance. And then at some point the timer that 
was set for an earlier record will fire.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-09 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193306#comment-17193306
 ] 

tinny cat commented on FLINK-19167:
---

the watermark is:

{code:java}
stream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() 
{
private long currentMaxTimestamp = 0L;
private long maxOutOfOrderness = 1L;
private Watermark watermark = null;
@Override
public long extractTimestamp(UserAction element, long 
previousElementTimestamp) {
long timestamp = element.viewTime;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - 
maxOutOfOrderness);
return watermark;
}
})
{code}


> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-09 Thread tinny cat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193305#comment-17193305
 ] 

tinny cat commented on FLINK-19167:
---

however, I set the `TimeCharacteristic` was event time, and assign the 
watermark as event time。
The conclusion I got is:
If ctx.timestamp() is assigned to `current.lastModified`,  This code as follow 
will be never execute:

{code:java}
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
out.collect(new Tuple2(result.key, result.count));
 }
{code}
because, `timestamp` always equals result.lastModified 


> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19167) Proccess Function Example could not work

2020-09-09 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192776#comment-17192776
 ] 

Aljoscha Krettek commented on FLINK-19167:
--

Hi, I believe the example is correct as it is. The watermark is not necessarily 
very similar to the timestamp of a record. For example, if this is the first 
record we're seeing the watermark could be {{Long.MIN_VALUE}}. Or the watermark 
could be held back by something else.

In general, I think accessing the watermark in an operator is wrong most of the 
timer and users should mostly work with the event timestamps.

> Proccess Function Example could not work
> 
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: tinny cat
>Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> however, it should be: 
> {code:java}
> @Override
> public void processElement(
> Tuple2 value, 
> Context ctx, 
> Collector> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time 
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 6);
> }
> @Override
> public void onTimer(
> long timestamp, 
> OnTimerContext ctx, 
> Collector> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 6` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)