It shouldn't be master only – the data is distributed in HDFS and I'm just
invoking sequenceFile() to get the file, map() to copy the data so objects
aren't re-used, keyBy() (JavaRDD) followed by sortByKey. In something like
Java-scala-ish-pseudo-code:
System.setProperty("spark.default.parallelism", "240");
JavaSparkContext sc = new JavaSparkContext(…)
System.out.println(sc.sequenceFile(…).map(x => new
MyX(x)).keyBy(…).sortByKey(…).takeSample(false, 10, 10));
I get the same problem when doing a groupBy() as well. My pains have been
scattered across several e-mail threads, sorry about that.
From: Ashish Rangole <[email protected]<mailto:[email protected]>>
Reply-To:
"[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Monday, December 9, 2013 10:08 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Hadoop RDD incorrect data
That data size is sufficiently small for the cluster configuration that you
mention.
Are you doing the sort in local mode or on master only? Is the default
parallelism
system property being set prior to creating SparkContext?
On Mon, Dec 9, 2013 at 10:45 PM, Matt Cheah
<[email protected]<mailto:[email protected]>> wrote:
Thanks for the prompt response. For the sort, the sequence file is 129GB in
size in HDFS. I have 10 EC2 m2.4xlarge nodes, 8 cores per node, with 65 GB of
RAM each. I don't really understand why it runs out of memory.
I've tried setting spark.default.parallelism to 80, 240, 400, and 800. None of
these configurations lets me sort the dataset without the cluster collapsing.
-Matt Cheah
________________________________
From: Matei Zaharia [[email protected]<mailto:[email protected]>]
Sent: Monday, December 09, 2013 7:02 PM
To: [email protected]<mailto:[email protected]>
Cc: Mingyu Kim
Subject: Re: Hadoop RDD incorrect data
Hi Matt,
The behavior for sequenceFile is there because we reuse the same Writable
object when reading elements from the file. This is definitely unintuitive, but
if you pass through each data item only once instead of caching it, it can be
more efficient (probably should be off by default though). However it means
that if you want to keep the objects, you do need to copy them. The sort
problem is probably due to the data becoming much bigger now that you have
distinct objects; you should use more reduce tasks for the sort to limit the
data per sort task. If you’re caching the dataset, also take a look at
http://spark.incubator.apache.org/docs/latest/tuning.html<https://urldefense.proofpoint.com/v1/url?u=http://spark.incubator.apache.org/docs/latest/tuning.html&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=BQ%2FXAv1hLx47BuPBRyZYqOQc626YVhtSGxCpcPcmixw%3D%0A&s=cad74dcbf74ec3215ac9a443e813295e59ce9e3408e35953c955e703f855e4d4>
for tips on how to lower memory usage (both in your Java data structures and
by serializing or compressing data).
Anyway, thanks for pointing this out — this is a really old behavior that makes
sense to change later on. We can probably add a flag called reuseObjects that
will be false by default.
Matei
On Dec 9, 2013, at 6:57 PM, Matt Cheah
<[email protected]<mailto:[email protected]>> wrote:
Hi,
Assume my spark context is pointing to local[N]. If I have an RDD created with
sparkContext.sequenceFile(…), and I call .collect() on it immediately (assume
it's small), sometimes I get duplicate rows back. In addition, if I call
sparkContext.sequenceFile(…) and immediately call an operation on it, I get
incorrect results – debugging the code in Eclipse shows that some of the
objects in the RDD are duplicates even when there are no duplicates in the
original sequence file.
I know this is a problem related to how Hadoop Writable serialization re-uses
objects. I wrote a solution which immediately "copies" the data from the
sequence file into a new object. More specifically:
sparkContext.sequenceFile(…).map(x => new MyClass(x))
Creating the RDD with the above code fixes that problem. However – now I get
out of memory errors trying to do something like sort this data set, when I run
my code against a 10-Node cluster.
My question is (assuming you've gotten past my very poor vague explanation of
my situation): How does the Hadoop file system and its optimization to re-use
objects, affect the contents of RDDs if they are collected and/or transformed?
And, does this have to be a concern when RDDs are retrieved when Spark is run
against a cluster? Or will I only see these anomalies if I'm running Spark on
local[N]?
Thanks! Hope that wasn't too confusing,
-Matt Cheah