You can use a tuple associating a timestamp to your running sum; and have
COMPUTE_RUNNING_SUM to reset the running sum to zero when the timestamp is
more than 5 minutes old.
You'll still have a leak doing so if your keys keep changing, though.

--Christophe


2014-08-29 9:00 GMT-07:00 Eko Susilo <eko.harmawan.sus...@gmail.com>:

>
> so the "codes" currently holding RDD containing codes and its respective
> counter. I would like to find a way to reset those RDD after some period of
> time.
>
>
> On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> "codes" is a DStream, not an RDD. The remember() method controls how
>> long Spark Streaming holds on to the RDDs itself. Clarify what you
>> mean by "reset"? codes provides a stream of RDDs that contain your
>> computation over a window of time. New RDDs come with the computation
>> over new data.
>>
>> On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
>> <eko.harmawan.sus...@gmail.com> wrote:
>> > Hi all,
>> >
>> > I would like to ask some advice about resetting spark stateful
>> operation.
>> > so i tried like this:
>> >
>> > JavaStreamingContext jssc = new JavaStreamingContext(context, new
>> > Duration(5000));
>> > jssc.remember(Duration(5*60*1000));
>> > jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
>> > JavaPairReceiverInputDStream<String, String> messages =
>> >            (JavaPairReceiverInputDStream<String, String>)
>> > KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
>> > topicMap);
>> > JavaPairDStream<String,String> windowed= messages.window(WINDOW_LENGTH,
>> > SLIDE_INTERVAL);
>> > JavaDStream<LogEntry> lines = windowed.map(new Function<Tuple2<String,
>> > String>, LogEntry>() { @Override public LogEntry call(Tuple2<String,
>> String>
>> > tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
>> _Result; }
>> > }).filter(Functions.FILTER_LOG_ENTRY).cache();
>> >
>> > JavaPairDStream<String,Long> codes=lines.mapToPair(Functions.GET_CODE).
>> > reduceByKey(Functions.SUM_REDUCER).
>> > updateStateByKey(COMPUTE_RUNNING_SUM);
>> > i thought by setting the remember to 5 minutes, the "codes" RDD that
>> derived
>> > from messages would also be reseted in 5 minutes, but in fact no.
>> >
>> > Is there any way to reset the "codes" RDD after a period of time (5
>> > minutes)?
>> >
>> > Thanks
>> >
>> >
>> >
>> > --
>> > Best Regards,
>> > Eko Susilo
>>
>
>
>
> --
> Best Regards,
> Eko Susilo
>

Reply via email to