Like many people, I'm trying to do hourly counts. The twist is that I don't want to count per hour of streaming, but per hour of the actual occurrence of the event (wall clock, say YYYY-mm-dd HH).
My thought is to make the streaming window large enough that a full hour of streaming data would fit inside it. Since my window slides in small increments, I want to drop the lowest hour from the stream before persisting the results(since it would have been reduced during the previous batch and would be a partial count in the current). I have gotten this far Every line of the input files is parsed into Event(type, hour), and stream is a DStream[RDD[Event]] val evtCountsByHour = stream.map(evt => (evt, 1)) .reduceByKeyAndWindow(_+_, Seconds(secondsInWindow)) //hourly counts per event .mapPartitions(iter => iter.map(x=>(x._1.hour,x))) My understanding is that at this point, the event counts are keyed by hour. 1. How do I detect the smallest key? I have seen some examples of partitionBy + mapPartitionsWithIndex and dropping the lowest index but can't figure out how to do it with a DStream. My gut feeling is that the first RDD in the stream has to contain the oldest data but that doesn't seem to be the case(printed from inside evtCountsByHour.foreachRDD) 2. If someone is further ahead with this type of problem, could you give some insight on how you approached it -- I think Streaming would be the correct approach since I don't really want to worry about data that was already processed and I want to process it continuously. I opted on reduceByKeyAndWindow with a large window as opposed to updateStateByKey as the hour the event occurred in is part of the key and I don't care to keep around that key once the next hour's events are coming in (I'm assuming RDDs outside the window are considered unreferenced). But I'd love to hear other suggestions if my logic is off.