I changed the datastructure to scala.collection.immutable.Set and I still see the same issue. My key is a String. I do the following in my reduce and invReduce.
visitorSet1 ++visitorSet2.toTraversable visitorSet1 --visitorSet2.toTraversable On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Yes, and in general any mutable data structure. You have to immutable data > structures whose hashcode and equals is consistent enough for being put in > a set. > > On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com> > wrote: > >> Are you suggesting against the usage of HashSet? >> >> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> This may be because of HashSet is a mutable data structure, and it seems >>> you are actually mutating it in "set1 ++set2". I suggest creating a new >>> HashMap in the function (and add both maps into it), rather than mutating >>> one of them. >>> >>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I see the following error when I use ReduceByKeyAndWindow in my Spark >>>> Streaming app. I use reduce, invReduce and filterFunction as shown >>>> below. >>>> Any idea as to why I get the error? >>>> >>>> java.lang.Exception: Neither previous window has value for key, nor new >>>> values found. Are you sure your key class hashes consistently? >>>> >>>> >>>> def reduceWithHashSet: ((Long, HashSet[String]), (Long, >>>> HashSet[String])) >>>> => (Long, HashSet[String])= { >>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, >>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 >>>> ++set2 ) >>>> >>>> } >>>> >>>> def invReduceWithHashSet: ((Long, HashSet[String]), (Long, >>>> HashSet[String])) => (Long, HashSet[String])= { >>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, >>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), >>>> set1.diff(set2)) >>>> } >>>> >>>> def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) => >>>> (Boolean)= { >>>> case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) >>>> => >>>> set.size>0 >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-spark-user-list. >>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi >>>> ndow-in-Spark-Streaming-tp28748.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >>