I changed the datastructure to scala.collection.immutable.Set and I still
see the same issue. My key is a String.  I do the following in my reduce
and invReduce.

visitorSet1 ++visitorSet2.toTraversable


visitorSet1 --visitorSet2.toTraversable

On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Yes, and in general any mutable data structure. You have to immutable data
> structures whose hashcode and equals is consistent enough for being put in
> a set.
>
> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com>
> wrote:
>
>> Are you suggesting against the usage of HashSet?
>>
>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> This may be because of HashSet is a mutable data structure, and it seems
>>> you are actually mutating it in "set1 ++set2". I suggest creating a new
>>> HashMap in the function (and add both maps into it), rather than mutating
>>> one of them.
>>>
>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>> below.
>>>> Any idea as to why I get the error?
>>>>
>>>>  java.lang.Exception: Neither previous window has value for key, nor new
>>>> values found. Are you sure your key class hashes consistently?
>>>>
>>>>
>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String]))
>>>> => (Long, HashSet[String])= {
>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>> ++set2 )
>>>>
>>>>   }
>>>>
>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>> set1.diff(set2))
>>>>   }
>>>>
>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>> (Boolean)= {
>>>>     case ((metricName:String, (timeStamp: Long, set: HashSet[String])))
>>>> =>
>>>> set.size>0
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>> ndow-in-Spark-Streaming-tp28748.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>

Reply via email to