[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196689#comment-17196689 ] Aljoscha Krettek commented on FLINK-19167: -- Thanks for caring and looking at things! It's good to question things because there might be real errors. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196688#comment-17196688 ] Aljoscha Krettek commented on FLINK-19167: -- Yes, that is correct. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196280#comment-17196280 ] tinny cat commented on FLINK-19167: --- thank you [~aljoscha] [~dwysakowicz] [~twalthr], I tried the data of multiple keys, and the example works correctly! So different watermarks share the same timer。 > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196180#comment-17196180 ] Aljoscha Krettek commented on FLINK-19167: -- That's correct, I think. But in a real-world scenario that would likely happen. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196123#comment-17196123 ] Dawid Wysakowicz commented on FLINK-19167: -- I think the problem is that it is not just simplified, but that it imposes assumptions on how the watermarks are generated (which I think are actually incorrect). I think what [~tinny] is describing that the condition is never met might be actually true. Let's assume a perfect watermark. To be precise a watermark where we generate a Watermark with value of the timestamp of each incoming record. If I am not mistaken the contract is that the element will be emitted first and the Watermark second. In that scenario when we have records incoming with timestamp t = 1, 2, 61000. We should emit a result with count 2 at time t = 60002. However the record which would move the watermark past the 60002, will update the lastModified to 61000. The example "works" only if the watermark progresses because of events from other keys. E.g. if you have values with just a single key, it won't work. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196112#comment-17196112 ] Aljoscha Krettek commented on FLINK-19167: -- The example is in fact simplified and doesn't support multiple "windows", yes. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196029#comment-17196029 ] Dawid Wysakowicz commented on FLINK-19167: -- And the behaviour with that incoming data is correct. Both 7 and 60007 belong to the same "window". You want to emit results only if the gap between events is more than 6. Now I do see a problem with that example though. It does not account for out-of-order elements in the {{processElement}} method or in other words it does not consider that there might be multiple open windows at a time. Nevertheless I still don't think your approach with using the {{currentWatermark}} is valid. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196015#comment-17196015 ] tinny cat commented on FLINK-19167: --- The following is my test output code: {code:java} @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception { Tuple1 key = (Tuple1) ctx.getCurrentKey(); CountWithTimestamp result = state.value(); System.out.println("timestamp: " + timestamp + ", " + "lastModified: " + result.lastModified); if (timestamp == result.lastModified + 6) { out.collect(new Tuple2<>(key.f0, result.count)); } } {code} Here are the three pieces of data I sent: 7, 8, 60007 Then the output of flink is as follows: {code:java} timestamp: 60007, lastModified: 60007 {code} [~dwysakowicz] > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196002#comment-17196002 ] Dawid Wysakowicz commented on FLINK-19167: -- Why do you say the {{result.lastModified}} is equal to 60007? It should be {{7}}. The value of {{ctx.timestamp()}} in {{ProcessFunction#processElement}} is equal to the timestamp of the incoming record. Thus it is {{7}} in that case. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195831#comment-17195831 ] tinny cat commented on FLINK-19167: --- the timer method did fired, but this if statement will not hold: {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} for example : There are 5 events incoming with timestamps t = 1, 2, 4, 5, 7. We want to emit results when the event time reaches 60007. Therefore we need to register a timer for 7 + 6, which is ctx.timestamp() + 6. when timestamp increases to 60007, the timer method will be fired, now the timestamp is 60007,and the value of result.lastModified is also 60007, therefore,result.lastModified + 6000 is greater than timestamp。so `out.collect(new Tuple2(result.key, result.count));` will never happen. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195308#comment-17195308 ] Timo Walther commented on FLINK-19167: -- [~tinny] the timestamp of {{onTimer}} will be the timestamp with which you registered the timer. It will not be the watermark. So I also think that the example is correct. If {{current.lastModified + 6}} is already covered by the current watermark when calling {{registerEventTimeTimer}}, the timer method would fire immediately. As Aljoscha said before, usually people don't need to think about watermarks in ProcessFunction because this is rather an internal concept. Users can simply work with event-time timestamps. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195306#comment-17195306 ] Dawid Wysakowicz commented on FLINK-19167: -- I second [~aljoscha] I think the example is correct. What do you return in your {{AssignerWithPeriodicWatermarks#extractTimestamp}} ? Do you return {{timestamp}} or {{currentMaxTimestamp}}? The correct behaviour would be to return the {{timestamp}}. The code in the {{onTime}} should trigger. Consider this example. There are 5 events incoming with timestamps t = 1, 2, 4, 8, 7. We want to emit results when the event time reaches 60007. Therefore we need to register a timer for 7 + 6, which is {{ctx.timestamp() + 6}}. Let us know if it is still unclear. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195154#comment-17195154 ] tinny cat commented on FLINK-19167: --- The watermark did increase, and the timer did trigger. I mean there is a problem with the code logic in the onTimer() method ,this equation will never hold. {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} the reason is the code is incorrect, {code:java} current.lastModified = ctx.timestamp(); {code} it should be {code:java} current.lastModified = ctx.timerService().currentWatermark(); {code} > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193513#comment-17193513 ] Aljoscha Krettek commented on FLINK-19167: -- This depends on what the rest of the system is doing. There must be more input records to make the watermark advance. And then at some point the timer that was set for an earlier record will fire. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193306#comment-17193306 ] tinny cat commented on FLINK-19167: --- the watermark is: {code:java} stream .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { private long currentMaxTimestamp = 0L; private long maxOutOfOrderness = 1L; private Watermark watermark = null; @Override public long extractTimestamp(UserAction element, long previousElementTimestamp) { long timestamp = element.viewTime; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); } @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; } }) {code} > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193305#comment-17193305 ] tinny cat commented on FLINK-19167: --- however, I set the `TimeCharacteristic` was event time, and assign the watermark as event time。 The conclusion I got is: If ctx.timestamp() is assigned to `current.lastModified`, This code as follow will be never execute: {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} because, `timestamp` always equals result.lastModified > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192776#comment-17192776 ] Aljoscha Krettek commented on FLINK-19167: -- Hi, I believe the example is correct as it is. The watermark is not necessarily very similar to the timestamp of a record. For example, if this is the first record we're seeing the watermark could be {{Long.MIN_VALUE}}. Or the watermark could be held back by something else. In general, I think accessing the watermark in an operator is wrong most of the timer and users should mostly work with the event timestamps. > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)