Hi Nirmalya,

thanks for the detailed description of your understanding of Flink's window
semantics.
Most of it is correct, but a few things need a bit of correction ;-)

Please see my comments inline.

2015-11-28 4:36 GMT+01:00 Nirmalya Sengupta <sengupta.nirma...@gmail.com>:

> Hello Fabian,
>
>
> A little long mail; please have some patience.
>
> From your response: ' Let's start by telling me what you actually want to
> do ;-) '
>
> At a broad level, I want to write a (series of, perhaps) tutorial of
> Flink, where these concepts are brought out by a mix of definition,
> elaboration, illustration and of course, code snippets. If that helps the
> community, I will be very happy. In the least, I will understand the
> principles and their application, much better. So, I am a bit selfish here
> perhaps. You also mention that you are preparing some such  material. If I
> can complement your effort, I will be delighted.
>
>
That sounds great! We are almost done with the blog post and will publish
it soon. Looking forward to your feedback :-)


> One never knows: going further, I may become a trainer / evangelist of
> Apache Flink, if I show enough grasp of the subject! :-)
>
> Now to this particular question (from SOF):
>
> When I began, my intention was to find maximum temperature, *every 5
> successive records* (to answer your question).
>
> As I have said before, I am learning and hence, trying with various
> operator combinations on the same set of data to see what happens and then,
> trying to explain why that happens.
>
> Let's refer to the code again:
>
> val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
>       .trigger(CountTrigger.of(5))
>       .evictor(CountEvictor.of(4))
>       .maxBy(1)
>
>
> So, what I understand is this:
>
> timeWindowAll defines a pane of 5 msecs. When this time expires, the
> timeWindowAll fires a kind of *onExpirationOf* trigger (I have fabricated
> the name, quite likely it doesn't exist). This perhaps does nothing other
> than passing to the function (here, *maxBy() *) the contents of the
> window (whatever number of elements have been collected in last 5 msecs)
> and clearing the pane, readying it for the next 5 msecs (not exactly, but
> more of it later).
>
>
This is correct. timeWindowAll(5 msecs) (without additional trigger
definitions) will create a new window every 5 msec, trigger after 5 msecs
(call the user function), purge the window, and create a new window.
Independent of the trigger, when a window expires, it is removed (including
all elements it contains) and a new window is created.


> However, I provide a CountTrigger (5 elements). According to the rules of
> Flink, this trigger replaces the aforementioned default onExpirationOf
> trigger. Therefore, when timeWindowAll is ready after 5 msecs have
> passed, what it finds available to fire is this CountTrigger. However, a
> CountTrigger can fire only if its count-related (here, 5) criterion is
> satisfied. So, after 5 msecs have passed, if the number of elements
> collected in timeWindowAll pane is >= 5, *only* then CountTrigger will
> fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug
> its shoulders and go back to wait for the next 5 msecs period.
>
>
If you define a CountTrigger(5), it will triggered exactly once when
exactly 5 elements are in the window. Even if there are 2mecs for the
window left. This will also replace the current trigger, that would trigger
at 5 msecs, i.e., the window is only evaluated once after the 5th element
was inserted. It depends on the trigger, what happens with the elements in
the pane after the function has been called. If you look at the Trigger
interface, you'll find that TriggerResult might be FIRE or FIRE_AND_PURGE
(among others). FIRE will call the user function and leave the elements in
the window. FIRE_AND_PURGE will call the user function, purge (delete) the
window, and create a new window within the same time bounds.


> Going further, I provide a CountEvictor. According to the rules of
>  Flink, an Evictor is allowed to act only when its associated trigger
> (here, CountTrigger) is fired. Because of its presence, a further check
> is made on the contents of the pane. If CountTigger is fired, the number
> of elements collected in the pane must be >= 5. However, the evictor is
> interested only in first 4 of them. The evictor *takes away* these 4 from
> timeWindowAll's pane and gives them to the function. The 5th element
> still remains in the pane. timeWindowAll readies itself for next 5 msecs,
> but its pane is not empty this time. It still has that solitary element
> there.
>
>
When the CountTrigger(5) fires, exactly 5 elements are in the window pane.
A CountEvictor(4) will remove the first element from the pane such that
only 4 elements remain, before it calls the user function to evaluate the
window. It depends on the TriggerResult, what happens with the four
elements after the user function was invoked. A CountTrigger keeps the
elements in the window.


> This much seems straightforward but there is a twist in the tale.
>
> A very important point about timeWindowAll's pane is its ephemeral
> nature. When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)),
> Flink understands this as an instruction to create a pane every successive
> 5 msecs period. Flink doesn't create one pane which is cleared after every
> 5 msecs (this is what I inexactly mentioned earlier), and readied for the
> next 5 msecs. Instead, it brings into existence a *freshly minted pane,*
> every 5 msecs. The contents of the preceding pane are subjected to trigger
> (if it exists) and evictor (if it exists) and finally, function (if it is
> provided). Then, Flink *dumps the preceding pane* along with its
> contents, if any and readies the new pane, awaiting elements during the
> next 5 msecs.
>
>
This is correct.


> In my case above, the criteria of timeWindowAll and Trigger/Evictor are
> not locked in step as precisely as they should have been. It is quite
> possible that while CountTrigger fires because 5 elements are already in
> the pane, 5 msecs are *yet to lapse*. So, the current pane of
> timeWindowAll is still alive and is collecting subsequent elements
> arriving. The evictor takes away 4 elements. The remaining element is
> joined by a few more *before* 5 msecs lapse. After 5 msecs have lapsed,
> Flink extirpates the pane - along with its current contents - and creates a
> fresh pane. In effect, some elements which arrive and are collected in the
> pane *never reach* the trigger/evictor pair. These unfortunate elements
> are destroyed along with the pane in which they reside. Obviously, this
> affects the output that I see. The calculation of maximum temperature is
> inaccurate simply because some of the temperature readings are never
> available to the _maxBy_ function.
>
> Have I got it almost correct? Will be keen to hear from you.
>
>
In fact there is a quite easy solution for your issue, you should not use a
time window but a count window instead:

val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .countWindowAll(5)
      .maxBy(1)

This will give you a tumbling count window, that calls maxBy() when ever 5
elements arrived.
See the documentation for details [1]

Please let me know if you have further questions,
Fabian


[1]
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams


> -- Nirmalya
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta5
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>

Reply via email to