Hi Matt, The behavior for sequenceFile is there because we reuse the same Writable object when reading elements from the file. This is definitely unintuitive, but if you pass through each data item only once instead of caching it, it can be more efficient (probably should be off by default though). However it means that if you want to keep the objects, you do need to copy them. The sort problem is probably due to the data becoming much bigger now that you have distinct objects; you should use more reduce tasks for the sort to limit the data per sort task. If you’re caching the dataset, also take a look at http://spark.incubator.apache.org/docs/latest/tuning.html for tips on how to lower memory usage (both in your Java data structures and by serializing or compressing data).
Anyway, thanks for pointing this out — this is a really old behavior that makes sense to change later on. We can probably add a flag called reuseObjects that will be false by default. Matei On Dec 9, 2013, at 6:57 PM, Matt Cheah <[email protected]> wrote: > Hi, > > Assume my spark context is pointing to local[N]. If I have an RDD created > with sparkContext.sequenceFile(…), and I call .collect() on it immediately > (assume it's small), sometimes I get duplicate rows back. In addition, if I > call sparkContext.sequenceFile(…) and immediately call an operation on it, I > get incorrect results – debugging the code in Eclipse shows that some of the > objects in the RDD are duplicates even when there are no duplicates in the > original sequence file. > > I know this is a problem related to how Hadoop Writable serialization re-uses > objects. I wrote a solution which immediately "copies" the data from the > sequence file into a new object. More specifically: > > sparkContext.sequenceFile(…).map(x => new MyClass(x)) > > Creating the RDD with the above code fixes that problem. However – now I get > out of memory errors trying to do something like sort this data set, when I > run my code against a 10-Node cluster. > > My question is (assuming you've gotten past my very poor vague explanation of > my situation): How does the Hadoop file system and its optimization to re-use > objects, affect the contents of RDDs if they are collected and/or > transformed? And, does this have to be a concern when RDDs are retrieved when > Spark is run against a cluster? Or will I only see these anomalies if I'm > running Spark on local[N]? > > Thanks! Hope that wasn't too confusing, > > -Matt Cheah
