Hello Stefano <stefano.bagh...@radicalbit.io> I have tried to implement what I understood from your mail earlier in the day, but it doesn't seem to produce the result I expect. Here's the code snippet:
------------------------------------------------------------------------- val env = StreamExecutionEnvironment.createLocalEnvironment(4) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .keyBy(_.readingTimeStamp) .countWindow(4,2) val avgReadings = readings .apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out: Collector[Float]) => { val readings : Iterable[Float] = v.map(_.ambientTemperature) val avg = readings.sum / readings.size out.collect(avg) }).setParallelism(1) avgReadings.print() ------------------------------------------------------------------------- And, here's the output: ------------------------------------------------------------------------- 1> 23.67 1> 21.0025 1> 23.79 2> 25.02 2> 23.3425 2> 25.02 3> 26.55 4> 19.970001 3> 18.93375 4> 25.727499 3> 18.93375 4> 25.7075 -------------------------------------------------------------------------- My understanding is that because I have associated a parallelism(1) to the avgReadings transformation, it should aggregate the streams from all the 4 earlier windows, and then compute the single average value. It is quite apparent that there is a gap in my understanding. Could you please point out the mistake that I am making? Many thanks in advance. -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."