Hello Stefano <stefano.bagh...@radicalbit.io>

I have tried to implement what I understood from your mail earlier in the
day, but it doesn't seem to produce the result I expect. Here's the code
snippet:

-------------------------------------------------------------------------
    val env = StreamExecutionEnvironment.createLocalEnvironment(4)

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
        .keyBy(_.readingTimeStamp)
        .countWindow(4,2)

    val avgReadings =
        readings
        .apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out:
Collector[Float]) => {

          val readings : Iterable[Float] = v.map(_.ambientTemperature)
          val avg = readings.sum / readings.size

          out.collect(avg)

    }).setParallelism(1)

    avgReadings.print()

-------------------------------------------------------------------------

And, here's the output:

-------------------------------------------------------------------------
1> 23.67
1> 21.0025
1> 23.79
2> 25.02
2> 23.3425
2> 25.02
3> 26.55
4> 19.970001
3> 18.93375
4> 25.727499
3> 18.93375
4> 25.7075
--------------------------------------------------------------------------

My understanding is that because I have associated a parallelism(1) to the
avgReadings transformation, it should aggregate the streams from all the 4
earlier windows, and then compute the single average value. It is quite
apparent that there is a gap in my understanding. Could you please point
out the mistake that I am making?

Many thanks in advance.

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to