I don't think it is possible in the way you try to do it. It is important to remember that the statements you mention only set up the stream stages, before the stream is actually running. Once it's running, you cannot change, remove or add stages.
I am not sure how you determine your condition and what the actual change should be when that condition is met: you say you want a different update function but then give a statement with the same update function but a different source stream). Is the condition determined somehow from the data coming through streamLogs, and is newData basically streamLogs again (rather than a whole data source?). In that case I can think of 3 things to try: - if the condition you switch on can be determined independently from every item in streamLogs, you can simply do an if/else inside updateResultsStream to change the method that you determine your state - if this is not the case, but you can determine when to switch your condition for each key independently, you can extend your state type to also keep track of your condition: rather than using JavaPairDStream<String, String> you make updatedResultsState a JavaPairDStream<String, Pair<String, Boolean>> (assuming you have some class Pair), and you make updateResultsStream update and check the state of the boolean. - finally, you can have a separate state stream that keeps track of your condition globally, then join that with you main stream and use that to update state. Something like: // determineCondition should result in a reduction to a single item that signals whether the condition is met in the current batch, updateContitionState should remember that conditionStateStream = streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState) // addCondition gets RDDs from streamLogs and single-item RDDs with the condition state and should add that state to each item in the streamLogs RDD joinedStream = streamLogs.transformWith(conditionStateStream, addCondition) // This is similar to the extend state type of the previous idea, but now your condition state is determined globally rather than per log entry updatedResultsState = joinedStream.updateStateByKey(updateResultsStream) I hope this applies to your case and that it makes sense, my Java is a bit rusty :) and perhaps others can suggest better spark streaming methods that can be used, but hopefully the idea is clear. Sander On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <[email protected]> wrote: > Hello guys, > > I have a stream job that will carryout computations and update the state > (SUM the value). At some point, I would like to reset the state. I could > drop the state by setting 'None' but I don't want to drop it. I would like > to keep the state but update the state. > > > For example: > > JavaPairDStream<String, String> updatedResultsState = > streamLogs.updateStateByKey(updateResultsStream); > > At some condition, I would like to update the state by key but with the > different values, hence different update function. > > > e.g. > > updatedResultsState = newData.updateStateByKey(resetResultsStream); > > But the newData.updateStateByKeyvalues cannot be replaced with the value > in streamLogs.updateStateByKey. Do you know how I could replace the state > value in streamLogs with newData. > > Is this possible? > > > > > >
