RE: RDD caching, memory & network input

2015-01-28 Thread Andrianasolo Fanilo
Each machine has 24 cores, but I assume each executor on a machine is 
attributed one core max because I set the –executor-cores property to 1.

I’m going to try a higher memoryOverhead later, I’ll post the results.

I’m caching the parsed version, something like

val matrix = PredictionReader.getFeatures(…).cache

Where getFeatures() loads the file then parses it.

De : Sandy Ryza []
Envoyé : mercredi 28 janvier 2015 17:12
À : Andrianasolo Fanilo
Cc :
Objet : Re: RDD caching, memory & network input

Hi Fanilo,

How many cores are you using per executor?  Are you aware that you can combat 
the "container is running beyond physical memory limits" error by bumping the 
spark.yarn.executor.memoryOverhead property?

Also, are you caching the parsed version or the text?


On Wed, Jan 28, 2015 at 4:25 AM, Andrianasolo Fanilo>> 
Hello Spark fellows ☺,

I think I need some help to understand how .cache and task input works within a 

I have an 7 GB input matrix in HDFS that I load using .textFile(). I also have 
a config file which contains an array of 12 Logistic Regression Model 
parameters, loaded as an Array[String], let’s call it models.

Then I basically apply each model to each line (as a LabeledPoint) of my matrix 
as following :

val matrix = sc.textFile(// HDFS path to matrix)…(parse matrix to make 
RDD[(String, LabeledPoint)] model =>
val weights = // parse model, which is an Array[String], to a 
Vector to give to LogisticRegressionModel
val rl = new LogisticRegressionModel(weigths, intercept)

   point => rl.predict(point._2.features) match {
   case 1.0 => Seq(“cool”)
   case 0.0 => Seq()

It seems normal to cache the matrix, since otherwise I’m going to read it 12 
times, each per model.

So…I launch my job on a 3 machines YARN cluster, using 18 executors with 
4GB memory each and 1 executor core.

When I don’t cache the matrix, the job executes in 12 minutes, and going to 
Spark UI I can see that each task has a 128 MB Hadoop input which is normal.

When I cache the matrix before going through the part, the first 
tasks process data from Hadoop input, and the matrix is completely stored 
in-memory (verified in the Storage tab of Spark UI). Unfortunately, the job 
takes 48 minutes instead of 12 minutes, because very few tasks actually read 
directly from memory afterwards, most tasks have network input and NODE_LOCAL 
locality level and those tasks take triple the time than tasks with Hadoop 
input or memory input.

Can you confirm my initial thoughts that :

• There are 18 executors on 3 machines, so 6 executors per machine

• One partition from matrix rdd is stored into one executor

• When a task needs to compute a partition in memory, it tries to get 
itself allocated on the executor that stores the partition

• If the executor is already dealing with a task, it is going to 
another executor on the same machine and then “downloads” the partition, hence 
the network input

If that is the case, how would you deal with the problem  :

• Answer 1 : Higher number of cores per executor ? (that got me a 
Container [pid=55355,containerID=container_1422284274724_0066_01_10] is 
running beyond physical memory limits from YARN, sadly)

• Answer 2 : Higher spark.locality.wait ? Since each task takes about 8 
seconds and it’s at 3 by default

• Answer 3 : Replicate the partitions ?

• Answer 4 : Something only you guys know that I am not aware of ?

• Bonus answer : don’t cache, it is not needed here



Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integri

Re: RDD caching, memory & network input

2015-01-28 Thread Sandy Ryza
Hi Fanilo,

How many cores are you using per executor?  Are you aware that you can
combat the "container is running beyond physical memory limits" error by
bumping the spark.yarn.executor.memoryOverhead property?

Also, are you caching the parsed version or the text?


On Wed, Jan 28, 2015 at 4:25 AM, Andrianasolo Fanilo <> wrote:

>  Hello Spark fellows J,
> I think I need some help to understand how .cache and task input works
> within a job.
> I have an 7 GB input matrix in HDFS that I load using .textFile(). I also
> have a config file which contains an array of 12 Logistic Regression Model
> parameters, loaded as an Array[String], let’s call it models.
> Then I basically apply each model to each line (as a LabeledPoint) of my
> matrix as following :
> val matrix = sc.textFile*(// HDFS path to matrix)…(parse matrix to make
> RDD[(String, LabeledPoint)]*
> model =>
> val weights = *// parse model, which is an Array[String],
> to a Vector to give to LogisticRegressionModel*
> val rl = new LogisticRegressionModel(weigths, intercept)
> rl.setThresold(0.5)
> matrix.flatMap(
>point => rl.predict(point._2.features)
> match {
>case 1.0 => Seq(“cool”)
>case 0.0 => Seq()
> )
> ).reduce(_++_)
> It seems normal to cache the matrix, since otherwise I’m going to read it
> 12 times, each per model.
> So…I launch my job on a 3 machines YARN cluster, using 18 executors
> with 4GB memory each and 1 executor core.
> When I don’t cache the matrix, the job executes in 12 minutes, and going
> to Spark UI I can see that each task has a 128 MB Hadoop input which is
> normal.
> When I cache the matrix before going through the part, the
> first tasks process data from Hadoop input, and the matrix is completely
> stored in-memory (verified in the Storage tab of Spark UI). Unfortunately,
> the job takes 48 minutes instead of 12 minutes, because very few tasks
> actually read directly from memory afterwards, most tasks have network
> input and NODE_LOCAL locality level and those tasks take triple the time
> than tasks with Hadoop input or memory input.
> Can you confirm my initial thoughts that :
> · There are 18 executors on 3 machines, so 6 executors per machine
> · One partition from matrix rdd is stored into one executor
> · When a task needs to compute a partition in memory, it tries to
> get itself allocated on the executor that stores the partition
> · If the executor is already dealing with a task, it is going to
> another executor on the same machine and then “downloads” the partition,
> hence the network input
> ?
> If that is the case, how would you deal with the problem  :
> · Answer 1 : Higher number of cores per executor ? (that got me a 
> *Container
> [pid=55355,containerID=container_1422284274724_0066_01_10] is running
> beyond physical memory limits *from YARN, sadly)
> · Answer 2 : Higher spark.locality.wait ? Since each task takes
> about 8 seconds and it’s at 3 by default
> · Answer 3 : Replicate the partitions ?
> · Answer 4 : Something only you guys know that I am not aware of ?
> · Bonus answer : don’t cache, it is not needed here
> Regards,
> Fanilo
> --
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.

RDD caching, memory & network input

2015-01-28 Thread Andrianasolo Fanilo
Hello Spark fellows :),

I think I need some help to understand how .cache and task input works within a 

I have an 7 GB input matrix in HDFS that I load using .textFile(). I also have 
a config file which contains an array of 12 Logistic Regression Model 
parameters, loaded as an Array[String], let's call it models.

Then I basically apply each model to each line (as a LabeledPoint) of my matrix 
as following :

val matrix = sc.textFile(// HDFS path to matrix)...(parse matrix to make 
RDD[(String, LabeledPoint)] model =>
val weights = // parse model, which is an Array[String], to a 
Vector to give to LogisticRegressionModel
val rl = new LogisticRegressionModel(weigths, intercept)

   point => rl.predict(point._2.features) match {
   case 1.0 => Seq("cool")
   case 0.0 => Seq()

It seems normal to cache the matrix, since otherwise I'm going to read it 12 
times, each per model.

So...I launch my job on a 3 machines YARN cluster, using 18 executors with 
4GB memory each and 1 executor core.

When I don't cache the matrix, the job executes in 12 minutes, and going to 
Spark UI I can see that each task has a 128 MB Hadoop input which is normal.

When I cache the matrix before going through the part, the first 
tasks process data from Hadoop input, and the matrix is completely stored 
in-memory (verified in the Storage tab of Spark UI). Unfortunately, the job 
takes 48 minutes instead of 12 minutes, because very few tasks actually read 
directly from memory afterwards, most tasks have network input and NODE_LOCAL 
locality level and those tasks take triple the time than tasks with Hadoop 
input or memory input.

Can you confirm my initial thoughts that :

* There are 18 executors on 3 machines, so 6 executors per machine

* One partition from matrix rdd is stored into one executor

* When a task needs to compute a partition in memory, it tries to get 
itself allocated on the executor that stores the partition

* If the executor is already dealing with a task, it is going to 
another executor on the same machine and then "downloads" the partition, hence 
the network input

If that is the case, how would you deal with the problem  :

* Answer 1 : Higher number of cores per executor ? (that got me a 
Container [pid=55355,containerID=container_1422284274724_0066_01_10] is 
running beyond physical memory limits from YARN, sadly)

* Answer 2 : Higher spark.locality.wait ? Since each task takes about 8 
seconds and it's at 3 by default

* Answer 3 : Replicate the partitions ?

* Answer 4 : Something only you guys know that I am not aware of ?

* Bonus answer : don't cache, it is not needed here



Ce message et les pi?ces jointes sont confidentiels et r?serv?s ? l'usage 
exclusif de ses destinataires. Il peut ?galement ?tre prot?g? par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
imm?diatement l'exp?diteur et de le d?truire. L'int?grit? du message ne pouvant 
?tre assur?e sur Internet, la responsabilit? de Worldline ne pourra ?tre 
recherch?e quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'exp?diteur ne donne aucune garantie ? cet ?gard et sa responsabilit? ne 
saurait ?tre recherch?e pour tout dommage r?sultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 