Hi Nirmalya, when using count windows the window will trigger after “slide-size” elements have been received. So, since in your example, slide-size is set to 1 it will emit a new max for every element received and once it accumulated 4 elements it will start removing one element for every new element that arrives before computing the max.
Cheers, Aljoscha > On 14 Dec 2015, at 02:55, Nirmalya Sengupta <[email protected]> > wrote: > > Hello Fabian (and others), > > Sorry to bring up the same flogged topic of CountWindowAll() but I just want > to be sure that I understand it right. > > For a dataset like the following (partial): > > ----------------------------------------- > > probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 > probe-dccefede,199,749.25,78.6057,1448028160,27.46 > probe-f29f9662,199,821.81,81.7831,1448028160,22.35 > probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 > probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 > probe-4d78b545,204,778.42,78.412,1448028160,25.92 > probe-400c5cdf,204,711.65,73.585,1448028160,27.18 > ........... > ----------------------------------------- > > The following code : > > ----------------------------------------- > val env = StreamExecutionEnvironment.createLocalEnvironment(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > env.setParallelism(1) > > val readings = > readIncomingReadings(env,"./sampleIOTTiny.csv") > .map(e => (e.sensorUUID,e.ambientTemperature)) > .countWindowAll(4,1) > .maxBy(1) > > > readings.print > ------------------------------------------- > > produces this (partial): > > ------------------------------------------ > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-400c5cdf,27.18) > ...... > ------------------------------------------ > > I am trying to justify the first three lines of the output. When I call > CountWindowAll(4,1), don't I instruct Flink that 'wait till you get at least > first 4 readings before you calculate the maximum'? It appears that Flink is > calculating max() for every incoming tuple it is adding to the window. What > is the correct and complete interpretation of the computation then? > > -- N > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them."
