Hi everyone

Really hoping to get some more help on an issue I've been stuck on for a couple 
of days now.

Basically, building the data manually from a text file and converting the text 
to the objects I'm sorting on, doesn't behave the same way as when I import the 
objects directly from a sequence file. The latter frequently OOMs the cluster.

I have a class that wraps a Java ArrayList, that can be serialized and written 
to a Hadoop SequenceFile (I.e. Implements the Writable interface). Let's call 
it WritableDataRow. It can take a Java List as its argument to wrap around, and 
also has a copy constructor.

Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of text, 
120GB when in sequence file format (not using compression to compact the 
bytes). CDH4.2.0-backed hadoop cluster.

First, building the RDD from a CSV and then sorting on index 1 works fine:

scala> import scala.collection.JavaConversions._ // Other imports here as well
import scala.collection.JavaConversions._

scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv")
rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at 
<console>:14

scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new 
WritableDataRow(x.split("\\|").toList))
rddAsWritableDataRows: 
org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow]
 = MappedRDD[2] at map at <console>:19

scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => 
(x.getContents().get(1).toString(), x));
rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, 
com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] at 
map at <console>:22

scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, 
WritableDataRow, (String, WritableDataRow)](rddAsKeyedWritableDataRows)
orderedFunct: 
org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String,
 com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
org.apache.spark.rdd.OrderedRDDFunctions@587acb54

scala> orderedFunct.sortByKey(true).count(); // Actually triggers the 
computation, as stated in a different e-mail thread
res0: org.apache.spark.rdd.RDD[(String, 
com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
MapPartitionsRDD[8] at sortByKey at <console>:27

The above works without too many surprises. I then save it as a Sequence File 
(using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this is 
how it's done in our Java-based application):

scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => 
(NullWritable.get(), x)));
pairRDD: 
org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow]
 = org.apache.spark.api.java.JavaPairRDD@8d2e9d9

scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", 
classOf[NullWritable], classOf[WritableDataRow], 
classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, 
WritableDataRow]]);
…
2013-12-11 20:09:14,444 [main] INFO  org.apache.spark.SparkContext - Job 
finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s

And now I want to get the RDD from the sequence file and sort THAT, and this is 
when I monitor Ganglia and "ps aux" and notice the memory usage climbing 
ridiculously:

scala> val rddAsSequenceFile = 
sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], 
classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes copy 
constructor to get around re-use of writable objects
rddAsSequenceFile: 
org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow]
 = MappedRDD[19] at map at <console>:19

scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, 
WritableDataRow, (String, WritableDataRow)](rddAsSequenceFile.map(x => 
(x.getContents().get(1).toString(), x)))
orderedFunct: 
org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String,
 com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6

scala>orderedFunct.sortByKey().count();

(On the necessity to copy writables from hadoop RDDs, see: 
https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3ccaf_kkpzrq4otyqvwcoc6plaz9x9_sfo33u4ysatki5ptqoy...@mail.gmail.com%3E
 )

My question is: why is it that when I sort the objects retrieved from the 
sequence files, there's a ton more memory used than just building the objects 
manually? It doesn't make sense to me. I'm theoretically performing the same 
operation on both datasets.

Thanks, I'd definitely appreciate the help!

-Matt Cheah

From: Andrew Winings <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, December 10, 2013 10:53 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Hadoop RDD incorrect data

It shouldn't be master only – the data is distributed in HDFS and I'm just 
invoking sequenceFile() to get the file, map() to copy the data so objects 
aren't re-used, keyBy() (JavaRDD) followed by sortByKey. In something like 
Java-scala-ish-pseudo-code:

System.setProperty("spark.default.parallelism", "240");
JavaSparkContext sc = new JavaSparkContext(…)
System.out.println(sc.sequenceFile(…).map(x => new 
MyX(x)).keyBy(…).sortByKey(…).takeSample(false, 10, 10));

I get the same problem when doing a groupBy() as well. My pains have been 
scattered across several e-mail threads, sorry about that.

From: Ashish Rangole <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, December 9, 2013 10:08 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Hadoop RDD incorrect data

That data size is sufficiently small for the cluster configuration that you 
mention.
Are you doing the sort in local mode or on master only? Is the default 
parallelism
system property being set prior to creating SparkContext?


On Mon, Dec 9, 2013 at 10:45 PM, Matt Cheah 
<[email protected]<mailto:[email protected]>> wrote:
Thanks for the prompt response. For the sort, the sequence file is 129GB in 
size in HDFS. I have 10 EC2 m2.4xlarge nodes, 8 cores per node, with 65 GB of 
RAM each. I don't really understand why it runs out of memory.

I've tried setting spark.default.parallelism to 80, 240, 400, and 800. None of 
these configurations lets me sort the dataset without the cluster collapsing.

-Matt Cheah

________________________________
From: Matei Zaharia [[email protected]<mailto:[email protected]>]
Sent: Monday, December 09, 2013 7:02 PM
To: [email protected]<mailto:[email protected]>
Cc: Mingyu Kim
Subject: Re: Hadoop RDD incorrect data

Hi Matt,

The behavior for sequenceFile is there because we reuse the same Writable 
object when reading elements from the file. This is definitely unintuitive, but 
if you pass through each data item only once instead of caching it, it can be 
more efficient (probably should be off by default though). However it means 
that if you want to keep the objects, you do need to copy them. The sort 
problem is probably due to the data becoming much bigger now that you have 
distinct objects; you should use more reduce tasks for the sort to limit the 
data per sort task. If you’re caching the dataset, also take a look at 
http://spark.incubator.apache.org/docs/latest/tuning.html<https://urldefense.proofpoint.com/v1/url?u=http://spark.incubator.apache.org/docs/latest/tuning.html&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=BQ%2FXAv1hLx47BuPBRyZYqOQc626YVhtSGxCpcPcmixw%3D%0A&s=cad74dcbf74ec3215ac9a443e813295e59ce9e3408e35953c955e703f855e4d4>
 for tips on how to lower memory usage (both in your Java data structures and 
by serializing or compressing data).

Anyway, thanks for pointing this out — this is a really old behavior that makes 
sense to change later on. We can probably add a flag called reuseObjects that 
will be false by default.

Matei

On Dec 9, 2013, at 6:57 PM, Matt Cheah 
<[email protected]<mailto:[email protected]>> wrote:

Hi,

Assume my spark context is pointing to local[N]. If I have an RDD created with 
sparkContext.sequenceFile(…), and I call .collect() on it immediately (assume 
it's small), sometimes I get duplicate rows back. In addition, if I call 
sparkContext.sequenceFile(…) and immediately call an operation on it, I get 
incorrect results – debugging the code in Eclipse shows that some of the 
objects in the RDD are duplicates even when there are no duplicates in the 
original sequence file.

I know this is a problem related to how Hadoop Writable serialization re-uses 
objects. I wrote a solution which immediately "copies" the data from the 
sequence file into a new object. More specifically:

sparkContext.sequenceFile(…).map(x => new MyClass(x))

Creating the RDD with the above code fixes that problem. However – now I get 
out of memory errors trying to do something like sort this data set, when I run 
my code against a 10-Node cluster.

My question is (assuming you've gotten past my very poor vague explanation of 
my situation): How does the Hadoop file system and its optimization to re-use 
objects, affect the contents of RDDs if they are collected and/or transformed? 
And, does this have to be a concern when RDDs are retrieved when Spark is run 
against a cluster? Or will I only see these anomalies if I'm running Spark on 
local[N]?

Thanks! Hope that wasn't too confusing,

-Matt Cheah


Reply via email to