Hi Nirmalya,
when using count windows the window will trigger after “slide-size” elements 
have been received. So, since in your example, slide-size is set to 1 it will 
emit a new max for every element received and once it accumulated 4 elements it 
will start removing one element for every new element that arrives before 
computing the max.

Cheers,
Aljoscha
> On 14 Dec 2015, at 02:55, Nirmalya Sengupta <[email protected]> 
> wrote:
> 
> Hello Fabian (and others),
> 
> Sorry to bring up the same flogged topic of CountWindowAll() but I just want 
> to be sure that I understand it right.
> 
> For a dataset like the following (partial):
> 
> -----------------------------------------
> 
> probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
> probe-dccefede,199,749.25,78.6057,1448028160,27.46
> probe-f29f9662,199,821.81,81.7831,1448028160,22.35
> probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
> probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
> probe-4d78b545,204,778.42,78.412,1448028160,25.92
> probe-400c5cdf,204,711.65,73.585,1448028160,27.18
> ...........
> -----------------------------------------
> 
> The following code :
> 
> -----------------------------------------
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     env.setParallelism(1)
> 
>     val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .countWindowAll(4,1)
>       .maxBy(1)
> 
> 
>     readings.print
> -------------------------------------------
> 
> produces this (partial):
> 
> ------------------------------------------
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-400c5cdf,27.18)
> ......
> ------------------------------------------
> 
> I am trying to justify the first three lines of the output. When I call 
> CountWindowAll(4,1), don't I instruct Flink that 'wait till you get at least 
> first 4 readings before you calculate the maximum'? It appears that Flink is 
> calculating max() for every incoming tuple it is adding to the window. What 
> is the correct and complete interpretation of the computation then?
> 
> -- N
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."

Reply via email to