Hello Fabian,
A little long mail; please have some patience. >From your response: ' Let's start by telling me what you actually want to do ;-) ' At a broad level, I want to write a (series of, perhaps) tutorial of Flink, where these concepts are brought out by a mix of definition, elaboration, illustration and of course, code snippets. If that helps the community, I will be very happy. In the least, I will understand the principles and their application, much better. So, I am a bit selfish here perhaps. You also mention that you are preparing some such material. If I can complement your effort, I will be delighted. One never knows: going further, I may become a trainer / evangelist of Apache Flink, if I show enough grasp of the subject! :-) Now to this particular question (from SOF): When I began, my intention was to find maximum temperature, *every 5 successive records* (to answer your question). As I have said before, I am learning and hence, trying with various operator combinations on the same set of data to see what happens and then, trying to explain why that happens. Let's refer to the code again: val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature)) .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)) .trigger(CountTrigger.of(5)) .evictor(CountEvictor.of(4)) .maxBy(1) So, what I understand is this: timeWindowAll defines a pane of 5 msecs. When this time expires, the timeWindowAll fires a kind of *onExpirationOf* trigger (I have fabricated the name, quite likely it doesn't exist). This perhaps does nothing other than passing to the function (here, *maxBy() *) the contents of the window (whatever number of elements have been collected in last 5 msecs) and clearing the pane, readying it for the next 5 msecs (not exactly, but more of it later). However, I provide a CountTrigger (5 elements). According to the rules of Flink, this trigger replaces the aforementioned default onExpirationOf trigger. Therefore, when timeWindowAll is ready after 5 msecs have passed, what it finds available to fire is this CountTrigger. However, a CountTrigger can fire only if its count-related (here, 5) criterion is satisfied. So, after 5 msecs have passed, if the number of elements collected in timeWindowAll pane is >= 5, *only* then CountTrigger will fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug its shoulders and go back to wait for the next 5 msecs period. Going further, I provide a CountEvictor. According to the rules of Flink, an Evictor is allowed to act only when its associated trigger (here, CountTrigger) is fired. Because of its presence, a further check is made on the contents of the pane. If CountTigger is fired, the number of elements collected in the pane must be >= 5. However, the evictor is interested only in first 4 of them. The evictor *takes away* these 4 from timeWindowAll's pane and gives them to the function. The 5th element still remains in the pane. timeWindowAll readies itself for next 5 msecs, but its pane is not empty this time. It still has that solitary element there. This much seems straightforward but there is a twist in the tale. A very important point about timeWindowAll's pane is its ephemeral nature. When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)), Flink understands this as an instruction to create a pane every successive 5 msecs period. Flink doesn't create one pane which is cleared after every 5 msecs (this is what I inexactly mentioned earlier), and readied for the next 5 msecs. Instead, it brings into existence a *freshly minted pane,* every 5 msecs. The contents of the preceding pane are subjected to trigger (if it exists) and evictor (if it exists) and finally, function (if it is provided). Then, Flink *dumps the preceding pane* along with its contents, if any and readies the new pane, awaiting elements during the next 5 msecs. In my case above, the criteria of timeWindowAll and Trigger/Evictor are not locked in step as precisely as they should have been. It is quite possible that while CountTrigger fires because 5 elements are already in the pane, 5 msecs are *yet to lapse*. So, the current pane of timeWindowAll is still alive and is collecting subsequent elements arriving. The evictor takes away 4 elements. The remaining element is joined by a few more *before* 5 msecs lapse. After 5 msecs have lapsed, Flink extirpates the pane - along with its current contents - and creates a fresh pane. In effect, some elements which arrive and are collected in the pane *never reach* the trigger/evictor pair. These unfortunate elements are destroyed along with the pane in which they reside. Obviously, this affects the output that I see. The calculation of maximum temperature is inaccurate simply because some of the temperature readings are never available to the _maxBy_ function. Have I got it almost correct? Will be keen to hear from you. -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta5 "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."