Hi Nirmalya, can you describe the semantics that you want to implement? Do you want to find the max temperature every 5 milliseconds or the max of every 5 records?
Right now, you are using a non-keyed timeWindow of 5 milliseconds. This will create a window for the complete stream every 5 msecs. However, because you overwrite the time trigger of the timeWindow, the window is not evaluated after 5 msecs, but instead after 5 elements were added to the window. Because of the evictor, only 4 elements are given into the max aggregation function. However, the window is not closed after the trigger was fired. Instead, more elements are added to the window until the 5 msecs are over. Then a new window is opened and triggered again after 5 elements were added. I know, these things are not easy to understand. We are working on a document that clarifies the dependencies of window assigner, trigger, evictor, window function, processing / event time, etc. Let's start by telling me what you actually want to do ;-) Cheers, Fabian 2015-11-27 14:49 GMT+01:00 Nirmalya Sengupta <sengupta.nirma...@gmail.com>: > Hello Fabian/Matthius, > > Many thanks for showing interest in my query on SOF. That helps me sustain > my enthusiasm. :-) > > After setting parallelism of environment to '1' and replacing _max()_ with > _maxBy()_, I get a list of maximum temperatures but I fail to explain to > myself, how does Flink arrive at those figures (attached below). I > understand that different runs will possibly generate different results, > because I am using **ProcessingTime** characteristic. Yet, I expect some > kind of a deterministic output which I don't see. > > Please prod me to the right direction. > > Here's the code I have been referring to: > > ------------------------------------------------- > > case class IncomingDataUnit ( > sensorUUID: String, radiationLevel: > Int,photoSensor: Float, > humidity: Float,timeStamp: Long, > ambientTemperature: Float) > extends Serializable { } > > > object SocketTextStreamWordCount { > > def main(args: Array[String]) { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > env.setParallelism(1) > > val readings = > readIncomingReadings(env,"./sampleIOTTiny.csv") > .map(e => (e.sensorUUID,e.ambientTemperature)) > .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)) > .trigger(CountTrigger.of(5)) > .evictor(CountEvictor.of(4)) > .maxBy(1) > > readings.print > > env.execute("Scala IOT Stream experiment Example") > > } > > private def readIncomingReadings(env: > StreamExecutionEnvironment,inputPath: String) : > DataStream[IncomingDataUnit] = { > env.readTextFile(inputPath).map(datum => { > val fields = datum.split(",") > IncomingDataUnit( > fields(0), // sensorUUID > fields(1).toInt, // radiationLevel > fields(2).toFloat, // photoSensor > fields(3).toFloat, // humidity > fields(4).toLong, // timeStamp > fields(5).toFloat // ambientTemperature > ) > }) > } > > } > > ------------------------------------------------- > > Here's the dataset: > ------------------------------------------------ > > probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 > probe-dccefede,199,749.25,78.6057,1448028160,27.46 > probe-f29f9662,199,821.81,81.7831,1448028160,22.35 > probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 > probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 > probe-4d78b545,204,778.42,78.412,1448028160,25.92 > probe-400c5cdf,204,711.65,73.585,1448028160,22.18 > probe-df2d4cad,199,820.8,72.936,1448028161,16.18 > probe-f4ef109e,199,785.68,77.5647,1448028161,16.36 > probe-3fac3350,200,720.12,78.2073,1448028161,19.19 > probe-42a9ddca,193,819.12,74.3712,1448028161,22.07 > probe-252a5bbd,197,710.32,80.6072,1448028161,14.64 > probe-987f2cb6,200,750.4,76.0533,1448028161,14.72 > probe-24444323,197,816.06,84.0816,1448028161,4.405 > probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43 > probe-20c609fb,204,804.37,84.5243,1448028161,22.87 > probe-c027fdc9,195,858.61,81.7682,1448028161,24.47 > probe-2c6cd3de,198,826.96,85.26,1448028162,18.99 > probe-960906ca,197,797.63,77.4359,1448028162,27.62 > ------------------------------------------------- > > And here's the output: > > --------------------- > > (probe-6c75cfbe,30.02) > (probe-42a9ddca,22.07) > (probe-960906ca,27.62) > (probe-400c5cdf,22.18) > (probe-f076c2b0,29.37) > (probe-6c75cfbe,30.02) > (probe-960906ca,27.62) > > --------------------- > > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them." >