Sorry for the late response due to time difference between Berlin and Seoul :-)
What you are asking is why I'm trying to modify the condition of the original
ternary operator used in EventTimeTrigger.onEventTime?
If that is your question, I've already answered it on the first in the thread
as follows:
>>> The reason why I'm asking is because I want to register an event-time timer
>>> when my custom trigger receives a special event which signifies the end of
>>> a session.
>>> The timestamp of the registered timer is not going to be equal to
>>> window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in
>>> such a case.
>>> As I also want to purge the content of a window when it expires,
>>> onEventTime should look like this:
>>>
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow window,
>>> TriggerContext ctx) {
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>>
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.
If it is not enough, let me explain in detail.
- The period of user events is normally 5 minutes but can be irregular by
manual operations.
- User sessions can be finished with a special flag which signifies the end of
a session.
- Some users do not complete normally; so we want to expire such sessions after
giving a timeout of 1 hour. We cannot expect low latency in such cases.
- For those who sends a special flag, we want to give them the result of window
aggregation as soon as we receive the special flag.
My streaming pipeline is quite simple as shown below.
// default parallelism is 20
env
.addSource(consumer)
.process(jsonParser)
.assignTimesstampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtrator(...){...})
.keyBy(_.key)
.window(EventTimeSessionWindows.withGap(sessionGap))
.trigger(new EarlyResultEventTimeTrigger[MyEvent](_.isLast))
.aggregate(aggregator)
.setParallelism(128)
.addSink(producer)
.setParallelism(128)
Initially, when an event with a special flag is entered, I want to
*immediately* emit the result of an aggregate function of a session window.
At that time, I've never thought that events from a user arrive to a session
window out-of-order.
So my first custom trigger looks like below:
class EarlyResultEventTimeTrigger[T](eval: (T => Boolean)) extends Trigger[T,
TimeWindow] {
override def onElement(element: T, timestamp: Long, window: TimeWindow, ctx:
Trigger.TriggerContext): TriggerResult = {
if (window.maxTimestamp <= ctx.getCurrentWatermark) {
TriggerResult.FIRE
} else {
if (eval(element)) {
TriggerResult.FIRE_AND_PURGE
} else {
ctx.registerEventTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}
}
}
Compared to the EventTimeTrigger.oneElement, three lines of code are added:
if (eval(element)) {
TriggerResult.FIRE_AND_PURGE
}
This custom trigger, however, ignores out-of-orderness of events from the same
user.
For example, we have the following 4 events from a user and they are added to
the different partitions of a Kafka topic:
- A@00:00:00 / partition 1
- B@00:05:00 / partition 2 / 5 minutes after A
- C@00:10:00 / partition 3 / 5 minutes after B
- D@00:10:01 with a special flag / partition 4 / 1 second after C
Let's assume that after going through different source-process-assigner task
chains and shuffling, the order of C and D is inverted; D arrives at the
session window earlier than C.
The above EarlyResultEventTimeTrigger.onElement is going to return
FIRE_AND_PURGE as soon as it receives D.
After A, B, and D are fired and purged, C arrives.
In this case, I end up with two different sessions:
- A, B, D
- C
I could return TriggerResult.FIRE on EarlyResultEventTimeTrigger.onElement, but
- it violates exactly-once semantics.
- it can significantly increase memory usage as our session gap is 1 hour.
To figure it out, I want to allow some degree of out-of-orderness before
execution of FIRE_AND_PURGE.
It can be done by registering a timer instead of returning FIRE_AND_PURGE.
Below is the new custom trigger.
class DelayedEarlyResultEventTimeTrigger[T](eval: (T => Boolean), delay: Long =
0) extends Trigger[T, TimeWindow] {
override def onElement(element: T, timestamp: Long, window: TimeWindow, ctx:
Trigger.TriggerContext): TriggerResult = {
if (window.maxTimestamp <= ctx.getCurrentWatermark) {
TriggerResult.FIRE
} else {
if (eval(element)) {
ctx.registerEventTimeTimer(timestamp + delay)
} else {
ctx.registerEventTimeTimer(window.maxTimestamp)
}
TriggerResult.CONTINUE
}
}
override def onEventTime(time: Long, window: TimeWindow, ctx:
Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
Instead of returning FIRE_AND_PURGE, I register a new event-time timer:
if (eval(element)) {
ctx.registerEventTimeTimer(timestamp + delay)
}
Please note that the timestamp of the registered timer is not equal to
window.maxTimestamp.
That's why I plan to return FIRE_AND_PURGE whenever
DelayedEarlyResultEventTimeTrigger.onEventTime is called as below:
override def onEventTime(time: Long, window: TimeWindow, ctx:
Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
What I want to be sure about here is that the original
EventTimeTrigger.onEventTime is not called except a session window is expired.
I'm not sure whether I understand your question and answer correctly, but
hoping that it will give you the detailed idea of what I'm trying to figure out.
p.s. we have a large session gap (=1 hour) and do not enable the allowed
lateness.
- Dongwon
On Tue, Jul 24, 2018 at 12:05 AM, Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:
Out of curiosity, why don't you want to keep it like this?
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
I mean checking for the maxTimestamp().
Best,
Aljoscha
> On 23. Jul 2018, at 16:10, 김동원 <[email protected]
> <mailto:[email protected]>> wrote:
>
> Hi Aljoscha,
>
> If that is the only case, I need to return TriggerResult.CONTINUE when time >
> window.maxTimestamp.
>
> It is very fortunate that onEventTime is not called when time <
> window.maxTimestamp except my timer.
>
> Thanks a lot for your quick reply.
>
> Best,
>
> - Dongwon
>
> 2018. 7. 23. 오후 10:58, Aljoscha Krettek <[email protected]
> <mailto:[email protected]>> 작성:
>
>> Hi,
>>
>> If you set an allowed lateness that is greater than zero you will get
>> another call to onEventTime() on window.maxTimestamp + allowedLateness.
>>
>> Does that help answer your question?
>>
>> Best,
>> Aljoscha
>>
>>> On 23. Jul 2018, at 15:40, Dongwon Kim <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi all,
>>>
>>> I want to be sure about when EventTimeTrigger.onEventTime() method is
>>> called with event-time session windows.
>>> It returns TriggerResult.FIRE only when the timestamp of the registered
>>> timer equals to the max timestamp of the current window:
>>>
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow window,
>>> TriggerContext ctx) {
>>> return time == window.maxTimestamp() ?
>>> TriggerResult.FIRE :
>>> TriggerResult.CONTINUE;
>>> }
>>>
>>> As far as I understand, when EventTimeTrigger is used with event-time
>>> session window, there's no chance of EventTimeTrigger.onEventTime being
>>> called with time != window.maxTimestamp.
>>> Is it true? If not, could anyone let me know the corner case?
>>>
>>> The reason why I'm asking is because I want to register an event-time timer
>>> when my custom trigger receives a special event which signifies the end of
>>> a session.
>>> The timestamp of the registered timer is not going to be equal to
>>> window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in
>>> such a case.
>>> As I also want to purge the content of a window when it expires,
>>> onEventTime should look like this:
>>>
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow window,
>>> TriggerContext ctx) {
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>>
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.
>>>
>>> - Dongwon
>>>
>>