Thanks, please do keep me posted!

> On 20 Mar 2017, at 21:50, Florian König <florian.koe...@micardo.com> wrote:
> 
> @Aljoscha Thank you for the pointer to ProcessFunction. That looks like a 
> better approach with less code and other overhead.
> 
> After restoring, the job is both reading new elements and emitting some, but 
> nowhere near as many as expected. I’ll investigate further after switching to 
> ProcessFunction. I suspect that there is some problem with my code. I’ll let 
> you know if any unexplained discrepancy remains.
> 
>> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek <aljos...@apache.org>:
>> 
>> As a general remark, I think the ProcessFunction 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html)
>>  could be better suited for implementing such a use case.
>> 
>> I did run tests on Flink 1.2 and master with a simple processing-time 
>> windowing job. After performing a savepoint and waiting a few minutes I 
>> restored and the windows that were still there immediately fired.
>> 
>> In your case, after restoring, is the Job also reading new elements or did 
>> you try with just restoring without any new input?
>> 
>>> On 19 Mar 2017, at 13:15, Florian König <florian.koe...@micardo.com> wrote:
>>> 
>>> @Aljoscha: We’re using 1.2.
>>> 
>>> The intention of our code is as follows: The events that flow through Flink 
>>> represent scheduling decisions, i.e. they contain the ID of a target 
>>> entity, a description of an action that should be performed on that entity 
>>> by some other job, and a timestamp of when that should happen.
>>> 
>>> We’re using the windowing mechanism to delay those events until they should 
>>> be forwarded (and trigger the corresponding action). Furthermore, the 
>>> schedule can be moved closer to the current point in time: subsequent 
>>> schedule events for an entity (identified by its ID) can set the trigger 
>>> time to an earlier instant. If the trigger time is in the past or very 
>>> shortly (e.g., 100 ms) after now, the action should be triggered 
>>> immediately. Actions scheduled for an instant after the currently planned 
>>> one should be ignored; i.e. the schedule cannot be moved to the future.
>>> 
>>> exemplary event stream
>>>     time … (ID, action, trigger time)       // intended reaction
>>>     0 … (1, 'foo', 10)              // trigger action 'foo' on entity 1 at 
>>> time 10
>>>     3 … (2, 'bar', 15)              // trigger action 'bar' on entity 2 at 
>>> time 15
>>>     4 … (1, 'foo', 7)               // move trigger back to time 7
>>>     9 … (1, 'foo', 12)              // ignore
>>>     15 … (2, 'bar', 15)             // trigger immediately
>>> 
>>> resulting stream:
>>>     (1, 'foo', 7)           // at time 7
>>>     (2, 'bar', 15)          // at time 15
>>> 
>>> To implement this, we have written a custom trigger that’s called from the 
>>> following Flink code:
>>> 
>>> …
>>> schedules.keyBy(schedule -> schedule.entityId)
>>>             .window(GlobalWindows.create())
>>>             .trigger(DynamicEarliestWindowTrigger.create())
>>>             .fold((Schedule) null, (folded, schedule) -> schedule)
>>>             .map( /* process schedules */ )
>>> …
>>> 
>>> We fold the scheduling events 'to themselves', because only the latest 
>>> event in each period is relevant. The custom trigger is implemented as 
>>> follows (only Flink-revelvant parts and syntax):
>>> 
>>> class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window> 
>>> extends Trigger<T, W> {
>>>     
>>>     ValueStateDescriptor<Long> windowEnd = new 
>>> ValueStateDescriptor<>("windowEnd", Long.class);
>>>     
>>>     TriggerResult onElement(T element, long timestamp, W window, 
>>> TriggerContext ctx) throws Exception {
>>>             val windowEndState = ctx.getPartitionedState(windowEnd);
>>>             val windowEndsAt = windowEndState.value();
>>>             val newEnd = element.getTimestamp();
>>>             
>>>             // no timer set yet, or intention to trigger earlier
>>>             if (windowEndsAt == null || newEnd <= windowEndsAt) {
>>>                     deleteCurrentTimer(ctx);
>>>                     
>>>                     // trigger time far enough from now => schedule timer
>>>                     if (newEnd > System.currentTimeMillis() + 100) {
>>>                             ctx.registerProcessingTimeTimer(newEnd);
>>>                             windowEndState.update(newEnd);
>>>                     } else {
>>>                             return TriggerResult.FIRE;      // close enough 
>>> => fire immediately
>>>                     }
>>>             }
>>>             
>>>             // ignore events that should be triggered in the future
>>>             return TriggerResult.CONTINUE;
>>>     }
>>> 
>>>     // fire when timer has reached pre-set time
>>>     TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) 
>>> throws Exception {
>>>             return TriggerResult.FIRE_AND_PURGE;
>>>     }
>>> 
>>>     // noop
>>>     TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
>>> throws Exception {
>>>             return TriggerResult.CONTINUE;
>>>     }
>>>     
>>>     void clear(W window, TriggerContext ctx) throws Exception {
>>>             deleteCurrentTimer(ctx);
>>>     }
>>>     
>>>     void deleteCurrentTimer(TriggerContext ctx) throws Exception {
>>>             val windowEndState = ctx.getPartitionedState(windowEnd);
>>>             val windowEndsAt = windowEndState.value();
>>> 
>>>             if (windowEndsAt != null) {
>>>                     ctx.deleteProcessingTimeTimer(windowEndsAt);
>>>                     windowEndState.clear();
>>>             }
>>>     }
>>>             
>>>     boolean canMerge() { return false; }
>>> }
>>> 
>>> The job state grows by the number of scheduled entities and the mechanism 
>>> works as intended, as long as the job runs. However, due to unrelated 
>>> reasons, the job sometimes fails and is restarted from a checkpoint. The 
>>> state size after the restore tells me that the state has been restored.
>>> 
>>> Yet, the mechanism stops working and none of the old scheduling events that 
>>> must have been ‚waiting‘ in the window for the timer to trigger are 
>>> actually forwarded. Hence my question if it’s possible that timers may not 
>>> be restored.
>>> 
>>> Any ideas what might have gone wrong? Is there a better way to implement 
>>> such a mechanism?
>>> 
>>> Thanks and enjoy the rest of your weekend :)
>>> Florian
>>> 
>>> 
>>>> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>> 
>>>> When restoring, processing-time timers that would have fired already 
>>>> should immediately fire.
>>>> 
>>>> @Florian what Flink version are you using? In Flink 1.1 there was a bug 
>>>> that led to processing-time timers not being reset when restoring.
>>>>> On 17 Mar 2017, at 15:39, Florian König <florian.koe...@micardo.com> 
>>>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> funny coincidence, I was just about to ask the same thing. I have noticed 
>>>>> this with restored checkpoints in one of my jobs. The timers seem to be 
>>>>> gone. My window trigger registers a processing timer, but it seems that 
>>>>> these don’t get restored - even if the timer is set to fire in the 
>>>>> future, after the restore.
>>>>> 
>>>>> Is there something I need to be aware of in my class implementing 
>>>>> Trigger? Anything I forgot to set in a method that’s being called upon a 
>>>>> restore?
>>>>> 
>>>>> Thanks
>>>>> Florian
>>>>> 
>>>>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI 
>>>>>> <y.marzou...@mindlytix.com>:
>>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> How does the processing time timer behave when a job is taken down with 
>>>>>> a savepoint and then restarted after the timer was supposed to fire? 
>>>>>> Will the timer fire at restart because it was missed during the 
>>>>>> savepoint?
>>>>>> 
>>>>>> I'm wondering because I would like to schedule periodic timers in the 
>>>>>> future (in processing time) at which a state is read and emitted, but 
>>>>>> I'm afraid the timer will never fire if it occurs when the job is being 
>>>>>> down, and therefore the state will never be emitted.
>>>>>> 
>>>>>> Best,
>>>>>> Yassine
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 

Reply via email to