Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks Imran, I'll try your suggestions. I eventually got this to run by 'checkpointing' the joined RDD (according to Akhil's suggestion) before performing the reduceBy, and then checkpointing it again afterward. i.e. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .persist(MEMORY_AND_DISK_SER) val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER) rdd3.count() It takes a while, but at least it runs. So, I'll be sure to try your suggestions for further speed-up. Thanks again for your help. On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote: Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
OutOfMemory and GC limits (TODO) Error in map after self-join
Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas