Hello Fabian,

A little long mail; please have some patience.

>From your response: ' Let's start by telling me what you actually want to
do ;-) '

At a broad level, I want to write a (series of, perhaps) tutorial of Flink,
where these concepts are brought out by a mix of definition, elaboration,
illustration and of course, code snippets. If that helps the community, I
will be very happy. In the least, I will understand the principles and
their application, much better. So, I am a bit selfish here perhaps. You
also mention that you are preparing some such  material. If I can
complement your effort, I will be delighted.

One never knows: going further, I may become a trainer / evangelist of
Apache Flink, if I show enough grasp of the subject! :-)

Now to this particular question (from SOF):

When I began, my intention was to find maximum temperature, *every 5
successive records* (to answer your question).

As I have said before, I am learning and hence, trying with various
operator combinations on the same set of data to see what happens and then,
trying to explain why that happens.

Let's refer to the code again:

val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
      .trigger(CountTrigger.of(5))
      .evictor(CountEvictor.of(4))
      .maxBy(1)


So, what I understand is this:

timeWindowAll defines a pane of 5 msecs. When this time expires, the
timeWindowAll fires a kind of *onExpirationOf* trigger (I have fabricated
the name, quite likely it doesn't exist). This perhaps does nothing other
than passing to the function (here, *maxBy() *) the contents of the window
(whatever number of elements have been collected in last 5 msecs) and
clearing the pane, readying it for the next 5 msecs (not exactly, but more
of it later).

However, I provide a CountTrigger (5 elements). According to the rules of
Flink, this trigger replaces the aforementioned default onExpirationOf
trigger. Therefore, when timeWindowAll is ready after 5 msecs have passed,
what it finds available to fire is this CountTrigger. However, a
CountTrigger can fire only if its count-related (here, 5) criterion is
satisfied. So, after 5 msecs have passed, if the number of elements
collected in timeWindowAll pane is >= 5, *only* then CountTrigger will
fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug
its shoulders and go back to wait for the next 5 msecs period.

Going further, I provide a CountEvictor. According to the rules of  Flink,
an Evictor is allowed to act only when its associated trigger (here,
CountTrigger) is fired. Because of its presence, a further check is made on
the contents of the pane. If CountTigger is fired, the number of elements
collected in the pane must be >= 5. However, the evictor is interested only
in first 4 of them. The evictor *takes away* these 4 from timeWindowAll's
pane and gives them to the function. The 5th element still remains in the
pane. timeWindowAll readies itself for next 5 msecs, but its pane is not
empty this time. It still has that solitary element there.

This much seems straightforward but there is a twist in the tale.

A very important point about timeWindowAll's pane is its ephemeral nature.
When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)), Flink
understands this as an instruction to create a pane every successive 5
msecs period. Flink doesn't create one pane which is cleared after every 5
msecs (this is what I inexactly mentioned earlier), and readied for the
next 5 msecs. Instead, it brings into existence a *freshly minted pane,*
every 5 msecs. The contents of the preceding pane are subjected to trigger
(if it exists) and evictor (if it exists) and finally, function (if it is
provided). Then, Flink *dumps the preceding pane* along with its contents,
if any and readies the new pane, awaiting elements during the next 5 msecs.

In my case above, the criteria of timeWindowAll and Trigger/Evictor are not
locked in step as precisely as they should have been. It is quite possible
that while CountTrigger fires because 5 elements are already in the pane, 5
msecs are *yet to lapse*. So, the current pane of timeWindowAll is still
alive and is collecting subsequent elements arriving. The evictor takes
away 4 elements. The remaining element is joined by a few more *before* 5
msecs lapse. After 5 msecs have lapsed, Flink extirpates the pane - along
with its current contents - and creates a fresh pane. In effect, some
elements which arrive and are collected in the pane *never reach* the
trigger/evictor pair. These unfortunate elements are destroyed along with
the pane in which they reside. Obviously, this affects the output that I
see. The calculation of maximum temperature is inaccurate simply because
some of the temperature readings are never available to the _maxBy_
function.

Have I got it almost correct? Will be keen to hear from you.

-- Nirmalya



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta5
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to