Perhaps the new trackStateByKey targeted for very 1.6 may help you here.
I'm not sure if it is part of 1.6 or not for sure as the jira does not
specify a fixed version.  The jira describing it is here:
https://issues.apache.org/jira/browse/SPARK-2629, and the design doc that
discusses the API changes is here:

https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#

Look for the timeout  function:

/**

  * Set the duration of inactivity (i.e. no new data) after which a state

  * can be terminated by the system. After this idle period, the system

  * will mark the idle state as being timed out, and call the tracking

  * function with State[S].isTimingOut() = true.

  */

 def timeout(duration: Duration): this.type

-Todd

On Wed, Nov 25, 2015 at 8:00 AM, diplomatic Guru <diplomaticg...@gmail.com>
wrote:

> Hello,
>
> I know how I could clear the old state depending on the input value. If
> some condition matches to determine that the state is old then set the
> return null, will invalidate the record. But this is only feasible if a new
> record arrives that matches the old key. What if no new data arrives for
> the old data, how could I make that invalid.
>
> e.g.
>
> A key/Value arrives like this
>
> Key 12-11-2015:10:00: Value:test,1,2,12-11-2015:10:00
>
> Above key will be updated to state.
>
> Every time there is a value for this '12-11-2015:10:00' key, it will be
> aggregated and updated. If the job is running for 24/7, then this state
> will be kept forever until we restart the job. But I could have a
> validation within the updateStateByKey function to check and delete the
> record if value[3]< SYSTIME-1. But this only effective if a new record
> arrives that matches the 12-11-2015:10:00 in the later days. What if no new
> values are received for this key:12-11-2015:10:00. I assume it will remain
> in the state, am I correct? if so the how do I clear the state?
>
> Thank you.
>
>
>

Reply via email to