Hi Swetha,
We have dealt with this issue a couple years ago and have solved it. The
key insight here was that adding to a HashSet and removing from a HashSet
are actually not inverse operations of each other.
For example, if you added a key K1 in batch1 and then again added that same
key K1 durin
Unfortunately, I am out of ideas. I dont know whats going wrong.
If you can, try using Structured Streaming. We are more active on the
Structured streaming project.
On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy wrote:
> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure
Hi TD,
I am still seeing this issue with any immuatble DataStructure. Any idea why
this happens? I use scala.collection.immutable.List[String]) and my reduce
and inverse reduce does the following.
visitorSet1 ++visitorSet2
visitorSet1.filterNot(visitorSet2.contains(_)
On Wed, Jun 7, 2017 a
I changed the datastructure to scala.collection.immutable.Set and I still
see the same issue. My key is a String. I do the following in my reduce
and invReduce.
visitorSet1 ++visitorSet2.toTraversable
visitorSet1 --visitorSet2.toTraversable
On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das
wrote:
Yes, and in general any mutable data structure. You have to immutable data
structures whose hashcode and equals is consistent enough for being put in
a set.
On Jun 6, 2017 4:50 PM, "swetha kasireddy"
wrote:
> Are you suggesting against the usage of HashSet?
>
> On Tue, Jun 6, 2017 at 3:36 PM, Ta
Are you suggesting against the usage of HashSet?
On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das
wrote:
> This may be because of HashSet is a mutable data structure, and it seems
> you are actually mutating it in "set1 ++set2". I suggest creating a new
> HashMap in the function (and add both maps
String])) => (Math.max(timeStamp1, timeStamp2),
> set1.diff(set2))
> }
>
> def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
> (Boolean)= {
> case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
> set.size>0
> }
>
>
>
>
>
>
tring, (Long, HashSet[String]))) =>
(Boolean)= {
case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
set.size>0
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWindow-in-Spark-Streaming-t