Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Tom Walwyn
Thanks Imran, I'll try your suggestions.

I eventually got this to run by 'checkpointing' the joined RDD (according
to Akhil's suggestion) before performing the reduceBy, and then
checkpointing it again afterward. i.e.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .persist(MEMORY_AND_DISK_SER)
 val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER)
 rdd3.count()

It takes a while, but at least it runs. So, I'll be sure to try your
suggestions for further speed-up.

Thanks again for your help.


On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote:

 Hi Tom,

 there are a couple of things you can do here to make this more efficient.
  first, I think you can replace your self-join with a groupByKey. on your
 example data set, this would give you

 (1, Iterable(2,3))
 (4, Iterable(3))

 this reduces the amount of data that needs to be shuffled, and that way
 you can produce all of your pairs just from the Iterable(2,3).

 second, if you expect the same pairs to appear many times in your dataset,
 you might first want to replace them with a count.  eg., if you start with

 (1,2)
 (1,2)
 (1,2)
 ...
 (1,2)
 (1,3)
 (1,3)
 (4,3)
 ...

 you might want to first convert that to get a count of each pair

 val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

 to give you something like:

 ((1,2), 145)
 ((1,3), 2)
 ((4,3), 982)
 ...

 and then with a little more massaging you can group by key and also keep
 the counts of each item:

 val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
 pairCounts.map{case((key, value), counts) =
   key - (value,counts)
 }.groupByKey

 which would give you something like

 (1, Iterable((2,145), (3, 2))
 (4, Iterable((3, 982))


 hope this helps
 Imran

 On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Imran Rashid
Hi Tom,

there are a couple of things you can do here to make this more efficient.
 first, I think you can replace your self-join with a groupByKey. on your
example data set, this would give you

(1, Iterable(2,3))
(4, Iterable(3))

this reduces the amount of data that needs to be shuffled, and that way you
can produce all of your pairs just from the Iterable(2,3).

second, if you expect the same pairs to appear many times in your dataset,
you might first want to replace them with a count.  eg., if you start with

(1,2)
(1,2)
(1,2)
...
(1,2)
(1,3)
(1,3)
(4,3)
...

you might want to first convert that to get a count of each pair

val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

to give you something like:

((1,2), 145)
((1,3), 2)
((4,3), 982)
...

and then with a little more massaging you can group by key and also keep
the counts of each item:

val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
pairCounts.map{case((key, value), counts) =
  key - (value,counts)
}.groupByKey

which would give you something like

(1, Iterable((2,145), (3, 2))
(4, Iterable((3, 982))


hope this helps
Imran

On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas






Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Thanks for the reply, I'll try your suggestions.

Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
of (Int, Int). I'm doing the self-join so I can count two things. First, I
can count the number of times a value appears in the data set. Second I can
count number of times values occur with the same key. For example, if I
have the following:

(1,2)
(1,3)
(4,3)

Then joining with itself I get:

(1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
(1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
(1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
(1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
(4,(3,3)) - map - ((3,3),1) _|

Note that I want to keep the duplicates (2,2) and reflections.

Rgds

On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas





OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Hi All,

I'm a new Spark (and Hadoop) user and I want to find out if the cluster
resources I am using are feasible for my use-case. The following is a
snippet of code that is causing a OOM exception in the executor after about
125/1000 tasks during the map stage.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
 .map(fp=((fp._2._1, fp._2._2), 1))
 .reduceByKey((x,y)=x+y)
 rdd2.count()

Which errors with a stack trace like:

 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
stage 2.0 (TID 498)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at scala.collection.immutable.List.foreach(List.scala:318)

rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
co-occuring values by key in the dataset, i.e. 'These two numbers occurred
with the same key n times'. I intentionally don't want to filter out
duplicates and reflections. rdd is about 3.6 million records, which has a
size in memory of about 120MB, and results in a 'joined' RDD (before the
reduceByKey stage) of around 460 million records, with a size in memory of
about 35GB.

My cluster setup is as follows. I have 3 nodes, where each node has 2 cores
and about 7.5GB of memory. I'm running Spark on YARN. The driver and
executors are allowed 1280m each and the job has 5 executors and 1 driver.
Additionally, I have set spark.storage.memoryFraction to 0.06, and
spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
the issue. I've also tried increasing the number of partitions after the
join dramatically (up to 15000). Nothing has been effective. Thus, I'm
beginning to suspect I don't have enough resources for the job.

Does anyone have a feeling about what the resource requirements would be
for a use-case like this? I could scale the cluster up if necessary, but
would like to avoid it. I'm willing to accept longer computation times if
that is an option.

Warm Regards,
Thomas


Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Akhil Das
Why are you joining the rdd with itself?

You can try these things:

- Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

- Set your default Serializer to Kryo (.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer))

- Enable rdd compression (.set(spark.rdd.compress,true))


Thanks
Best Regards

On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas