Hi,

Thanks for the response. We are trying to implement something similar as 
discussed in the following SFO post.

http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation

We are doing it in java while accepted answer(second answer) in this post is in 
Scala.


We wrote our java code taking this scala code as reference. But we are getting 
exception in highlighted line i.e. in  return 
Optional.of(events.add(state.toString());); Specifically, it happens when we 
call events.add()



final Function2<List<String>, Optional<List<String>>, Optional<List<String>>> 
updateFunc =

       new Function2<List<String>, Optional<List<String>>,

Optional<List<String>>>() {



        public Optional<List<String>> call(List<String> events, 
Optional<List<String>> state) throws Exception {

        // TODO Auto-generated method stub

            if(state.toString()==null)

               return Optional.of(events);

            else {

//UnsupportedOperationException here

               return Optional.of(events.add(state.toString()););

            }

       }

};

Please let us know if you need more details. Unfortunately we are not in a 
position to share whole code.

Thanks



Regards,

Anand/Yogesh
From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, October 16, 2015 1:22 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: Get the previous state string in Spark streaming

Its hard to help without any stacktrace associated with 
UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan 
<ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote:

One of my co-worker(Yogesh) was trying to get this posted in spark mailing and 
it seems it did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream<String, List<String>> RDD.

I want to get the list of string from its previous state. For that I use 
updateStateByKey function as follows:



final Function2<List<String>, Optional<List<String>>, Optional<List<String>>> 
updateFunc =

       new Function2<List<String>, Optional<List<String>>,

Optional<List<String>>>() {



        public Optional<List<String>> call(List<String> arg0, 
Optional<List<String>> arg1) throws Exception {

        // TODO Auto-generated method stub

            if(arg1.toString()==null)

               return Optional.of(arg0);

            else {

               arg0.add(arg1.toString());

               return Optional.of(arg0);

            }

       }

};



I want the function to append the new list of string to the previous list and 
return the new list. But I am not able to do so. I am getting the C error.

Can anyone which help me out in getting the desired output?


Reply via email to