Sorry, I have to correct myself. The windowing semantics are not easy ;-)
2015-11-30 15:34 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Hi Nirmalya, > > thanks for the detailed description of your understanding of Flink's > window semantics. > Most of it is correct, but a few things need a bit of correction ;-) > > Please see my comments inline. > > 2015-11-28 4:36 GMT+01:00 Nirmalya Sengupta <sengupta.nirma...@gmail.com>: > >> Hello Fabian, >> >> >> A little long mail; please have some patience. >> >> From your response: ' Let's start by telling me what you actually want >> to do ;-) ' >> >> At a broad level, I want to write a (series of, perhaps) tutorial of >> Flink, where these concepts are brought out by a mix of definition, >> elaboration, illustration and of course, code snippets. If that helps the >> community, I will be very happy. In the least, I will understand the >> principles and their application, much better. So, I am a bit selfish here >> perhaps. You also mention that you are preparing some such material. If I >> can complement your effort, I will be delighted. >> >> > That sounds great! We are almost done with the blog post and will publish > it soon. Looking forward to your feedback :-) > > >> One never knows: going further, I may become a trainer / evangelist of >> Apache Flink, if I show enough grasp of the subject! :-) >> >> Now to this particular question (from SOF): >> >> When I began, my intention was to find maximum temperature, *every 5 >> successive records* (to answer your question). >> >> As I have said before, I am learning and hence, trying with various >> operator combinations on the same set of data to see what happens and then, >> trying to explain why that happens. >> >> Let's refer to the code again: >> >> val readings = >> readIncomingReadings(env,"./sampleIOTTiny.csv") >> .map(e => (e.sensorUUID,e.ambientTemperature)) >> .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)) >> .trigger(CountTrigger.of(5)) >> .evictor(CountEvictor.of(4)) >> .maxBy(1) >> >> >> So, what I understand is this: >> >> timeWindowAll defines a pane of 5 msecs. When this time expires, the >> timeWindowAll fires a kind of *onExpirationOf* trigger (I have >> fabricated the name, quite likely it doesn't exist). This perhaps does >> nothing other than passing to the function (here, *maxBy() *) the >> contents of the window (whatever number of elements have been collected in >> last 5 msecs) and clearing the pane, readying it for the next 5 msecs (not >> exactly, but more of it later). >> >> > This is correct. timeWindowAll(5 msecs) (without additional trigger > definitions) will create a new window every 5 msec, trigger after 5 msecs > (call the user function), purge the window, and create a new window. > Independent of the trigger, when a window expires, it is removed (including > all elements it contains) and a new window is created. > THIS IS NOT CORRECT --> "Independent of the trigger, when a window expires, it is removed (including all elements it contains) and a new window is created." In fact, a window is only removed if a trigger returns FIRE_AND_PURGE or PURGE. The default time windows (without additional Trigger) purge their content at their end time. If you apply a trigger that does *not* purge the content of the window after it expires, it will consume memory forever. > > >> However, I provide a CountTrigger (5 elements). According to the rules >> of Flink, this trigger replaces the aforementioned default onExpirationOf >> trigger. Therefore, when timeWindowAll is ready after 5 msecs have >> passed, what it finds available to fire is this CountTrigger. However, a >> CountTrigger can fire only if its count-related (here, 5) criterion is >> satisfied. So, after 5 msecs have passed, if the number of elements >> collected in timeWindowAll pane is >= 5, *only* then CountTrigger will >> fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug >> its shoulders and go back to wait for the next 5 msecs period. >> >> > If you define a CountTrigger(5), it will triggered exactly once when > exactly 5 elements are in the window. Even if there are 2mecs for the > window left. This will also replace the current trigger, that would trigger > at 5 msecs, i.e., the window is only evaluated once after the 5th element > was inserted. It depends on the trigger, what happens with the elements in > the pane after the function has been called. If you look at the Trigger > interface, you'll find that TriggerResult might be FIRE or FIRE_AND_PURGE > (among others). FIRE will call the user function and leave the elements in > the window. FIRE_AND_PURGE will call the user function, purge (delete) the > window, and create a new window within the same time bounds. > > >> Going further, I provide a CountEvictor. According to the rules of >> Flink, an Evictor is allowed to act only when its associated trigger >> (here, CountTrigger) is fired. Because of its presence, a further check >> is made on the contents of the pane. If CountTigger is fired, the number >> of elements collected in the pane must be >= 5. However, the evictor is >> interested only in first 4 of them. The evictor *takes away* these 4 >> from timeWindowAll's pane and gives them to the function. The 5th >> element still remains in the pane. timeWindowAll readies itself for next >> 5 msecs, but its pane is not empty this time. It still has that solitary >> element there. >> >> > When the CountTrigger(5) fires, exactly 5 elements are in the window pane. > A CountEvictor(4) will remove the first element from the pane such that > only 4 elements remain, before it calls the user function to evaluate the > window. It depends on the TriggerResult, what happens with the four > elements after the user function was invoked. A CountTrigger keeps the > elements in the window. > > >> This much seems straightforward but there is a twist in the tale. >> >> A very important point about timeWindowAll's pane is its ephemeral >> nature. When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)), >> Flink understands this as an instruction to create a pane every successive >> 5 msecs period. Flink doesn't create one pane which is cleared after every >> 5 msecs (this is what I inexactly mentioned earlier), and readied for the >> next 5 msecs. Instead, it brings into existence a *freshly minted pane,* >> every 5 msecs. The contents of the preceding pane are subjected to trigger >> (if it exists) and evictor (if it exists) and finally, function (if it is >> provided). Then, Flink *dumps the preceding pane* along with its >> contents, if any and readies the new pane, awaiting elements during the >> next 5 msecs. >> >> > This is correct. > > >> In my case above, the criteria of timeWindowAll and Trigger/Evictor are >> not locked in step as precisely as they should have been. It is quite >> possible that while CountTrigger fires because 5 elements are already in >> the pane, 5 msecs are *yet to lapse*. So, the current pane of >> timeWindowAll is still alive and is collecting subsequent elements >> arriving. The evictor takes away 4 elements. The remaining element is >> joined by a few more *before* 5 msecs lapse. After 5 msecs have lapsed, >> Flink extirpates the pane - along with its current contents - and creates a >> fresh pane. In effect, some elements which arrive and are collected in the >> pane *never reach* the trigger/evictor pair. These unfortunate elements >> are destroyed along with the pane in which they reside. Obviously, this >> affects the output that I see. The calculation of maximum temperature is >> inaccurate simply because some of the temperature readings are never >> available to the _maxBy_ function. >> >> Have I got it almost correct? Will be keen to hear from you. >> >> > In fact there is a quite easy solution for your issue, you should not use > a time window but a count window instead: > > val readings = > readIncomingReadings(env,"./sampleIOTTiny.csv") > .map(e => (e.sensorUUID,e.ambientTemperature)) > .countWindowAll(5) > .maxBy(1) > > This will give you a tumbling count window, that calls maxBy() when ever 5 > elements arrived. > See the documentation for details [1] > > Please let me know if you have further questions, > Fabian > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams > > >> -- Nirmalya >> >> >> >> -- >> Software Technologist >> http://www.linkedin.com/in/nirmalyasengupta5 >> "If you have built castles in the air, your work need not be lost. That >> is where they should be. >> Now put the foundation under them." >> > >