Hello Fabian (and others),
Sorry to bring up the same flogged topic of CountWindowAll() but I just
want to be sure that I understand it right.
For a dataset like the following (partial):
-----------------------------------------
probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,27.18
...........
-----------------------------------------
The following code :
-----------------------------------------
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.countWindowAll(4,1)
.maxBy(1)
readings.print
-------------------------------------------
produces this (partial):
------------------------------------------
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-400c5cdf,27.18)
......
------------------------------------------
I am trying to justify the first three lines of the output. When I call
CountWindowAll(4,1), don't I instruct Flink that '*wait till you get at
least first 4 readings before you calculate the maximum*'? It appears that
Flink is calculating max() for every incoming tuple it is adding to the
window. What is the correct and complete interpretation of the computation
then?
-- N
--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."