Hi Nirmalya,

can you describe the semantics that you want to implement?
Do you want to find the max temperature every 5 milliseconds or the max of
every 5 records?

Right now, you are using a non-keyed timeWindow of 5 milliseconds. This
will create a window for the complete stream every 5 msecs.
However, because you overwrite the time trigger of the timeWindow, the
window is not evaluated after 5 msecs, but instead after 5 elements were
added to the window. Because of the evictor, only 4 elements are given into
the max aggregation function. However, the window is not closed after the
trigger was fired. Instead, more elements are added to the window until the
5 msecs are over. Then a new window is opened and triggered again after 5
elements were added.

I know, these things are not easy to understand. We are working on a
document that clarifies the dependencies of window assigner, trigger,
evictor, window function, processing / event time, etc.

Let's start by telling me what you actually want to do ;-)

Cheers, Fabian

2015-11-27 14:49 GMT+01:00 Nirmalya Sengupta <sengupta.nirma...@gmail.com>:

> Hello Fabian/Matthius,
>
> Many thanks for showing interest in my query on SOF. That helps me sustain
> my enthusiasm. :-)
>
> After setting parallelism of environment to '1' and replacing _max()_ with
> _maxBy()_, I get a list of maximum temperatures but I fail to explain to
> myself, how does Flink arrive at those figures (attached below). I
> understand that different runs will possibly generate different results,
> because I am using **ProcessingTime** characteristic. Yet, I expect some
> kind of a deterministic output which I don't see.
>
> Please prod me to the right direction.
>
> Here's the code I have been referring to:
>
> -------------------------------------------------
>
> case class IncomingDataUnit (
>                               sensorUUID: String, radiationLevel:
> Int,photoSensor: Float,
>                               humidity: Float,timeStamp: Long,
> ambientTemperature: Float)
>   extends Serializable { }
>
>
> object SocketTextStreamWordCount {
>
>   def main(args: Array[String]) {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     env.setParallelism(1)
>
>     val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
>       .trigger(CountTrigger.of(5))
>       .evictor(CountEvictor.of(4))
>       .maxBy(1)
>
>     readings.print
>
>     env.execute("Scala IOT Stream  experiment Example")
>
>   }
>
>   private def readIncomingReadings(env:
> StreamExecutionEnvironment,inputPath: String) :
> DataStream[IncomingDataUnit] = {
>     env.readTextFile(inputPath).map(datum => {
>       val fields = datum.split(",")
>       IncomingDataUnit(
>         fields(0),              // sensorUUID
>         fields(1).toInt,        // radiationLevel
>         fields(2).toFloat,      // photoSensor
>         fields(3).toFloat,      // humidity
>         fields(4).toLong,       // timeStamp
>         fields(5).toFloat       // ambientTemperature
>       )
>     })
>   }
>
>  }
>
> -------------------------------------------------
>
> Here's the dataset:
> ------------------------------------------------
>
> probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
> probe-dccefede,199,749.25,78.6057,1448028160,27.46
> probe-f29f9662,199,821.81,81.7831,1448028160,22.35
> probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
> probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
> probe-4d78b545,204,778.42,78.412,1448028160,25.92
> probe-400c5cdf,204,711.65,73.585,1448028160,22.18
> probe-df2d4cad,199,820.8,72.936,1448028161,16.18
> probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
> probe-3fac3350,200,720.12,78.2073,1448028161,19.19
> probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
> probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
> probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
> probe-24444323,197,816.06,84.0816,1448028161,4.405
> probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
> probe-20c609fb,204,804.37,84.5243,1448028161,22.87
> probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
> probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
> probe-960906ca,197,797.63,77.4359,1448028162,27.62
> -------------------------------------------------
>
> And here's the output:
>
> ---------------------
>
> (probe-6c75cfbe,30.02)
> (probe-42a9ddca,22.07)
> (probe-960906ca,27.62)
> (probe-400c5cdf,22.18)
> (probe-f076c2b0,29.37)
> (probe-6c75cfbe,30.02)
> (probe-960906ca,27.62)
>
> ---------------------
>
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>

Reply via email to