You can use a tuple associating a timestamp to your running sum; and have COMPUTE_RUNNING_SUM to reset the running sum to zero when the timestamp is more than 5 minutes old. You'll still have a leak doing so if your keys keep changing, though.
--Christophe 2014-08-29 9:00 GMT-07:00 Eko Susilo <eko.harmawan.sus...@gmail.com>: > > so the "codes" currently holding RDD containing codes and its respective > counter. I would like to find a way to reset those RDD after some period of > time. > > > On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen <so...@cloudera.com> wrote: > >> "codes" is a DStream, not an RDD. The remember() method controls how >> long Spark Streaming holds on to the RDDs itself. Clarify what you >> mean by "reset"? codes provides a stream of RDDs that contain your >> computation over a window of time. New RDDs come with the computation >> over new data. >> >> On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo >> <eko.harmawan.sus...@gmail.com> wrote: >> > Hi all, >> > >> > I would like to ask some advice about resetting spark stateful >> operation. >> > so i tried like this: >> > >> > JavaStreamingContext jssc = new JavaStreamingContext(context, new >> > Duration(5000)); >> > jssc.remember(Duration(5*60*1000)); >> > jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); >> > JavaPairReceiverInputDStream<String, String> messages = >> > (JavaPairReceiverInputDStream<String, String>) >> > KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group", >> > topicMap); >> > JavaPairDStream<String,String> windowed= messages.window(WINDOW_LENGTH, >> > SLIDE_INTERVAL); >> > JavaDStream<LogEntry> lines = windowed.map(new Function<Tuple2<String, >> > String>, LogEntry>() { @Override public LogEntry call(Tuple2<String, >> String> >> > tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return >> _Result; } >> > }).filter(Functions.FILTER_LOG_ENTRY).cache(); >> > >> > JavaPairDStream<String,Long> codes=lines.mapToPair(Functions.GET_CODE). >> > reduceByKey(Functions.SUM_REDUCER). >> > updateStateByKey(COMPUTE_RUNNING_SUM); >> > i thought by setting the remember to 5 minutes, the "codes" RDD that >> derived >> > from messages would also be reseted in 5 minutes, but in fact no. >> > >> > Is there any way to reset the "codes" RDD after a period of time (5 >> > minutes)? >> > >> > Thanks >> > >> > >> > >> > -- >> > Best Regards, >> > Eko Susilo >> > > > > -- > Best Regards, > Eko Susilo >