Re: OOM with groupBy + saveAsTextFile
Yes, that's the same thing really. You're still writing a huge value as part of one single (key,value) record. The value exists in memory in order to be written to storage. Although there aren't hard limits, in general, keys and values aren't intended to be huge, like, hundreds of megabytes. You should probably design this differently, to not try to collect a massive value per key. That is a generally good idea, not just for this reason. Certainly, you don't have to be able to fit many (key,value) in memory at once. One, yes. On Mon, Nov 3, 2014 at 10:08 AM, Bharath Ravi Kumar wrote: > The result was no different with saveAsHadoopFile. In both cases, I can see > that I've misinterpreted the API docs. I'll explore the API's a bit further > for ways to save the iterable as chunks rather than one large text/binary. > It might also help to clarify this aspect in the API docs. For those (like > me) whose first practical experience with data processing is through spark, > having skipped the Hadoop MR ecosystem, it might help to clarify > interactions with HDFS and the likes. Thanks for all the help. > > On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen wrote: >> >> saveAsText means "save every element of the RDD as one line of text". >> It works like TextOutputFormat in Hadoop MapReduce since that's what >> it uses. So you are causing it to create one big string out of each >> Iterable this way. >> >> On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar >> wrote: >> > Thanks for responding. This is what I initially suspected, and hence >> > asked >> > why the library needed to construct the entire value buffer on a single >> > host >> > before writing it out. The stacktrace appeared to suggest that user code >> > is >> > not constructing the large buffer. I'm simply calling groupBy and >> > saveAsText >> > on the resulting grouped rdd. The value after grouping is an >> > Iterable>. None of the strings >> > are >> > large. I also do not need a single large string created out of the >> > Iterable >> > for writing to disk. Instead, I expect the iterable to get written out >> > in >> > chunks in response to saveAsText. This shouldn't be the default >> > behaviour of >> > saveAsText perhaps? Hence my original question of the behavior of >> > saveAsText. The tuning / partitioning attempts were aimed at reducing >> > memory >> > pressure so that multiple such buffers aren't constructed at the same >> > time >> > on a host. I'll take a second look at the data and code before updating >> > this >> > thread. Thanks. >> > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM with groupBy + saveAsTextFile
I also realized from your description of saveAsText that the API is indeed behaving as expected i.e. it is appropriate (though not optimal) for the API to construct a single string out of the value. If the value turns out to be large, the user of the API needs to reconsider the implementation approach. My bad. On Mon, Nov 3, 2014 at 3:38 PM, Bharath Ravi Kumar wrote: > The result was no different with saveAsHadoopFile. In both cases, I can > see that I've misinterpreted the API docs. I'll explore the API's a bit > further for ways to save the iterable as chunks rather than one large > text/binary. It might also help to clarify this aspect in the API docs. For > those (like me) whose first practical experience with data processing is > through spark, having skipped the Hadoop MR ecosystem, it might help to > clarify interactions with HDFS and the likes. Thanks for all the help. > > On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen wrote: > >> saveAsText means "save every element of the RDD as one line of text". >> It works like TextOutputFormat in Hadoop MapReduce since that's what >> it uses. So you are causing it to create one big string out of each >> Iterable this way. >> >> On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar >> wrote: >> > Thanks for responding. This is what I initially suspected, and hence >> asked >> > why the library needed to construct the entire value buffer on a single >> host >> > before writing it out. The stacktrace appeared to suggest that user >> code is >> > not constructing the large buffer. I'm simply calling groupBy and >> saveAsText >> > on the resulting grouped rdd. The value after grouping is an >> > Iterable>. None of the strings >> are >> > large. I also do not need a single large string created out of the >> Iterable >> > for writing to disk. Instead, I expect the iterable to get written out >> in >> > chunks in response to saveAsText. This shouldn't be the default >> behaviour of >> > saveAsText perhaps? Hence my original question of the behavior of >> > saveAsText. The tuning / partitioning attempts were aimed at reducing >> memory >> > pressure so that multiple such buffers aren't constructed at the same >> time >> > on a host. I'll take a second look at the data and code before updating >> this >> > thread. Thanks. >> > >> > >
Re: OOM with groupBy + saveAsTextFile
The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen wrote: > saveAsText means "save every element of the RDD as one line of text". > It works like TextOutputFormat in Hadoop MapReduce since that's what > it uses. So you are causing it to create one big string out of each > Iterable this way. > > On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar > wrote: > > Thanks for responding. This is what I initially suspected, and hence > asked > > why the library needed to construct the entire value buffer on a single > host > > before writing it out. The stacktrace appeared to suggest that user code > is > > not constructing the large buffer. I'm simply calling groupBy and > saveAsText > > on the resulting grouped rdd. The value after grouping is an > > Iterable>. None of the strings are > > large. I also do not need a single large string created out of the > Iterable > > for writing to disk. Instead, I expect the iterable to get written out in > > chunks in response to saveAsText. This shouldn't be the default > behaviour of > > saveAsText perhaps? Hence my original question of the behavior of > > saveAsText. The tuning / partitioning attempts were aimed at reducing > memory > > pressure so that multiple such buffers aren't constructed at the same > time > > on a host. I'll take a second look at the data and code before updating > this > > thread. Thanks. > > >
Re: OOM with groupBy + saveAsTextFile
saveAsText means "save every element of the RDD as one line of text". It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar wrote: > Thanks for responding. This is what I initially suspected, and hence asked > why the library needed to construct the entire value buffer on a single host > before writing it out. The stacktrace appeared to suggest that user code is > not constructing the large buffer. I'm simply calling groupBy and saveAsText > on the resulting grouped rdd. The value after grouping is an > Iterable>. None of the strings are > large. I also do not need a single large string created out of the Iterable > for writing to disk. Instead, I expect the iterable to get written out in > chunks in response to saveAsText. This shouldn't be the default behaviour of > saveAsText perhaps? Hence my original question of the behavior of > saveAsText. The tuning / partitioning attempts were aimed at reducing memory > pressure so that multiple such buffers aren't constructed at the same time > on a host. I'll take a second look at the data and code before updating this > thread. Thanks. > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM with groupBy + saveAsTextFile
Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an Iterable>. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks. None of your tuning will help here because the problem is actually the way you are saving the output. If you take a look at the stacktrace, it is trying to build a single string that is too large for the VM to allocate memory. The VM is actually not running out of memory, but rather, JVM cannot support a single String so large. I suspect this is due to the fact that the value in your key, value pair after group by is too long (maybe it concatenates every single record). Do you really want to save the key, value output this way using a text file? Maybe you can write them out as multiple strings rather than a single super giant string. On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com < arthur.hk.c...@gmail.com> wrote: > > Hi, > > FYI as follows. Could you post your heap size settings as well your Spark > app code? > > Regards > Arthur > > 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail > message Requested array size exceeds VM limit indicates that the > application (or APIs used by that application) attempted to allocate an > array that is larger than the heap size. For example, if an application > attempts to allocate an array of 512MB but the maximum heap size is 256MB > then OutOfMemoryError will be thrown with the reason Requested array size > exceeds VM limit. In most cases the problem is either a configuration > issue (heap size too small), or a bug that results in an application > attempting to create a huge array, for example, when the number of elements > in the array are computed using an algorithm that computes an incorrect > size.” > > > > > On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar > wrote: > > Resurfacing the thread. Oom shouldn't be the norm for a common groupby / > sort use case in a framework that is leading in sorting bench marks? Or is > there something fundamentally wrong in the usage? > On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" wrote: > >> Hi, >> >> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD >> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD >> of 1061 keys with values being Iterable> String>>. The job runs on 3 hosts in a standalone setup with each host's >> executor having 100G RAM and 24 cores dedicated to it. While the groupBy >> stage completes successfully with ~24GB of shuffle write, the >> saveAsTextFile fails after repeated retries with each attempt failing due >> to an out of memory error *[1]*. I understand that a few partitions may >> be overloaded as a result of the groupBy and I've tried the following >> config combinations unsuccessfully: >> >> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across >> 1061 paritions and have max cores = 3 so that each key is a "logical" >> partition (though many partitions will end up on very few hosts), and each >> host likely runs saveAsTextFile on a single key at a time due to max cores >> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. >> >> 2) Leave max cores unspecified, set the level of parallelism to 72, and >> leave number of partitions unspecified (in which case the # input >> partitions was used, which is 44) >> Since I do not intend to cache RDD's, I have set >> spark.storage.memoryFraction=0.2 in both cases. >> >> My understanding is that if each host is processing a single logical >> partition to saveAsTextFile and is reading from other hosts to write out >> the RDD, it is unlikely that it would run out of memory. My interpretation >> of the spark tuning guide is that the degree of parallelism has little >> impact in case (1) above since max cores = number of hosts. Can someone >> explain why there are still OOM's with 100G being available? On a related >> note, intuitively (though I haven't read the source), it appears that an >> entire key-value pair needn't fit into memory of a single host for >> saveAsTextFile since a single shuffle read from a remote can be written to >> HDFS before the next remote read is carried out. This way, not all data >> needs t
Re: OOM with groupBy + saveAsTextFile
None of your tuning will help here because the problem is actually the way you are saving the output. If you take a look at the stacktrace, it is trying to build a single string that is too large for the VM to allocate memory. The VM is actually not running out of memory, but rather, JVM cannot support a single String so large. I suspect this is due to the fact that the value in your key, value pair after group by is too long (maybe it concatenates every single record). Do you really want to save the key, value output this way using a text file? Maybe you can write them out as multiple strings rather than a single super giant string. On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com < arthur.hk.c...@gmail.com> wrote: > > Hi, > > FYI as follows. Could you post your heap size settings as well your Spark > app code? > > Regards > Arthur > > 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail > message Requested array size exceeds VM limit indicates that the > application (or APIs used by that application) attempted to allocate an > array that is larger than the heap size. For example, if an application > attempts to allocate an array of 512MB but the maximum heap size is 256MB > then OutOfMemoryError will be thrown with the reason Requested array size > exceeds VM limit. In most cases the problem is either a configuration > issue (heap size too small), or a bug that results in an application > attempting to create a huge array, for example, when the number of elements > in the array are computed using an algorithm that computes an incorrect > size.” > > > > > On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar > wrote: > > Resurfacing the thread. Oom shouldn't be the norm for a common groupby / > sort use case in a framework that is leading in sorting bench marks? Or is > there something fundamentally wrong in the usage? > On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" wrote: > >> Hi, >> >> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD >> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD >> of 1061 keys with values being Iterable> String>>. The job runs on 3 hosts in a standalone setup with each host's >> executor having 100G RAM and 24 cores dedicated to it. While the groupBy >> stage completes successfully with ~24GB of shuffle write, the >> saveAsTextFile fails after repeated retries with each attempt failing due >> to an out of memory error *[1]*. I understand that a few partitions may >> be overloaded as a result of the groupBy and I've tried the following >> config combinations unsuccessfully: >> >> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across >> 1061 paritions and have max cores = 3 so that each key is a "logical" >> partition (though many partitions will end up on very few hosts), and each >> host likely runs saveAsTextFile on a single key at a time due to max cores >> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. >> >> 2) Leave max cores unspecified, set the level of parallelism to 72, and >> leave number of partitions unspecified (in which case the # input >> partitions was used, which is 44) >> Since I do not intend to cache RDD's, I have set >> spark.storage.memoryFraction=0.2 in both cases. >> >> My understanding is that if each host is processing a single logical >> partition to saveAsTextFile and is reading from other hosts to write out >> the RDD, it is unlikely that it would run out of memory. My interpretation >> of the spark tuning guide is that the degree of parallelism has little >> impact in case (1) above since max cores = number of hosts. Can someone >> explain why there are still OOM's with 100G being available? On a related >> note, intuitively (though I haven't read the source), it appears that an >> entire key-value pair needn't fit into memory of a single host for >> saveAsTextFile since a single shuffle read from a remote can be written to >> HDFS before the next remote read is carried out. This way, not all data >> needs to be collected at the same time. >> >> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the >> tuning guide and even as per Datastax's spark introduction), there may need >> to be more documentation around the internals of spark to help users take >> better informed tuning decisions with parallelism, max cores, number >> partitions and other tunables. Is there any ongoing effort on that front? >> >> Thanks, >> Bharath >> >> >> *[1]* OOM stack trace and logs >> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID >> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array >> size exceeds VM limit >> java.util.Arrays.copyOf(Arrays.java:3326) >> >> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >> >> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >> >> java.lang.AbstractStringBuilder.append(Abstract
Re: OOM with groupBy + saveAsTextFile
Hi, FYI as follows. Could you post your heap size settings as well your Spark app code? Regards Arthur 3.1.3 Detail Message: Requested array size exceeds VM limit The detail message Requested array size exceeds VM limit indicates that the application (or APIs used by that application) attempted to allocate an array that is larger than the heap size. For example, if an application attempts to allocate an array of 512MB but the maximum heap size is 256MB then OutOfMemoryError will be thrown with the reason Requested array size exceeds VM limit. In most cases the problem is either a configuration issue (heap size too small), or a bug that results in an application attempting to create a huge array, for example, when the number of elements in the array are computed using an algorithm that computes an incorrect size.” On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar wrote: > Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort > use case in a framework that is leading in sorting bench marks? Or is there > something fundamentally wrong in the usage? > > On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" wrote: > Hi, > > I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of > count ~ 100 million. The data size is 20GB and groupBy results in an RDD of > 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's > executor having 100G RAM and 24 cores dedicated to it. While the groupBy > stage completes successfully with ~24GB of shuffle write, the saveAsTextFile > fails after repeated retries with each attempt failing due to an out of > memory error [1]. I understand that a few partitions may be overloaded as a > result of the groupBy and I've tried the following config combinations > unsuccessfully: > > 1) Repartition the initial rdd (44 input partitions but 1061 keys) across > 1061 paritions and have max cores = 3 so that each key is a "logical" > partition (though many partitions will end up on very few hosts), and each > host likely runs saveAsTextFile on a single key at a time due to max cores = > 3 with 3 hosts in the cluster. The level of parallelism is unspecified. > > 2) Leave max cores unspecified, set the level of parallelism to 72, and leave > number of partitions unspecified (in which case the # input partitions was > used, which is 44) > Since I do not intend to cache RDD's, I have set > spark.storage.memoryFraction=0.2 in both cases. > > My understanding is that if each host is processing a single logical > partition to saveAsTextFile and is reading from other hosts to write out the > RDD, it is unlikely that it would run out of memory. My interpretation of the > spark tuning guide is that the degree of parallelism has little impact in > case (1) above since max cores = number of hosts. Can someone explain why > there are still OOM's with 100G being available? On a related note, > intuitively (though I haven't read the source), it appears that an entire > key-value pair needn't fit into memory of a single host for saveAsTextFile > since a single shuffle read from a remote can be written to HDFS before the > next remote read is carried out. This way, not all data needs to be collected > at the same time. > > Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the > tuning guide and even as per Datastax's spark introduction), there may need > to be more documentation around the internals of spark to help users take > better informed tuning decisions with parallelism, max cores, number > partitions and other tunables. Is there any ongoing effort on that front? > > Thanks, > Bharath > > > [1] OOM stack trace and logs > 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID > 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size > exceeds VM limit > java.util.Arrays.copyOf(Arrays.java:3326) > > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > java.lang.StringBuilder.append(StringBuilder.java:136) > scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) > scala.Tuple2.toString(Tuple2.scala:22) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.
Re: OOM with groupBy + saveAsTextFile
Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort use case in a framework that is leading in sorting bench marks? Or is there something fundamentally wrong in the usage? On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" wrote: > Hi, > > I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD > of count ~ 100 million. The data size is 20GB and groupBy results in an RDD > of 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's > executor having 100G RAM and 24 cores dedicated to it. While the groupBy > stage completes successfully with ~24GB of shuffle write, the > saveAsTextFile fails after repeated retries with each attempt failing due > to an out of memory error *[1]*. I understand that a few partitions may > be overloaded as a result of the groupBy and I've tried the following > config combinations unsuccessfully: > > 1) Repartition the initial rdd (44 input partitions but 1061 keys) across > 1061 paritions and have max cores = 3 so that each key is a "logical" > partition (though many partitions will end up on very few hosts), and each > host likely runs saveAsTextFile on a single key at a time due to max cores > = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. > > 2) Leave max cores unspecified, set the level of parallelism to 72, and > leave number of partitions unspecified (in which case the # input > partitions was used, which is 44) > Since I do not intend to cache RDD's, I have set > spark.storage.memoryFraction=0.2 in both cases. > > My understanding is that if each host is processing a single logical > partition to saveAsTextFile and is reading from other hosts to write out > the RDD, it is unlikely that it would run out of memory. My interpretation > of the spark tuning guide is that the degree of parallelism has little > impact in case (1) above since max cores = number of hosts. Can someone > explain why there are still OOM's with 100G being available? On a related > note, intuitively (though I haven't read the source), it appears that an > entire key-value pair needn't fit into memory of a single host for > saveAsTextFile since a single shuffle read from a remote can be written to > HDFS before the next remote read is carried out. This way, not all data > needs to be collected at the same time. > > Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the > tuning guide and even as per Datastax's spark introduction), there may need > to be more documentation around the internals of spark to help users take > better informed tuning decisions with parallelism, max cores, number > partitions and other tunables. Is there any ongoing effort on that front? > > Thanks, > Bharath > > > *[1]* OOM stack trace and logs > 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID > 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > java.util.Arrays.copyOf(Arrays.java:3326) > > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > java.lang.StringBuilder.append(StringBuilder.java:136) > > scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) > scala.Tuple2.toString(Tuple2.scala:22) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID > 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, > 43704, 0), shuffleId=0, mapId=13, reduceId=92) > 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at > ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 > (groupBy at ModelTrainer.java:133) > 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at > ModelTrainer.java:141) failed in 55.259 s > 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at > ModelTrainer.java:133) and Stage 36 (saveAsTextFile at > ModelTrainer.java:141) due to fetch failure > > > > >
Re: OOM with groupBy + saveAsTextFile
Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit. On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar wrote: > Hi, > > I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD > of count ~ 100 million. The data size is 20GB and groupBy results in an RDD > of 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's > executor having 100G RAM and 24 cores dedicated to it. While the groupBy > stage completes successfully with ~24GB of shuffle write, the > saveAsTextFile fails after repeated retries with each attempt failing due > to an out of memory error *[1]*. I understand that a few partitions may > be overloaded as a result of the groupBy and I've tried the following > config combinations unsuccessfully: > > 1) Repartition the initial rdd (44 input partitions but 1061 keys) across > 1061 paritions and have max cores = 3 so that each key is a "logical" > partition (though many partitions will end up on very few hosts), and each > host likely runs saveAsTextFile on a single key at a time due to max cores > = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. > > 2) Leave max cores unspecified, set the level of parallelism to 72, and > leave number of partitions unspecified (in which case the # input > partitions was used, which is 44) > Since I do not intend to cache RDD's, I have set > spark.storage.memoryFraction=0.2 in both cases. > > My understanding is that if each host is processing a single logical > partition to saveAsTextFile and is reading from other hosts to write out > the RDD, it is unlikely that it would run out of memory. My interpretation > of the spark tuning guide is that the degree of parallelism has little > impact in case (1) above since max cores = number of hosts. Can someone > explain why there are still OOM's with 100G being available? On a related > note, intuitively (though I haven't read the source), it appears that an > entire key-value pair needn't fit into memory of a single host for > saveAsTextFile since a single shuffle read from a remote can be written to > HDFS before the next remote read is carried out. This way, not all data > needs to be collected at the same time. > > Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the > tuning guide and even as per Datastax's spark introduction), there may need > to be more documentation around the internals of spark to help users take > better informed tuning decisions with parallelism, max cores, number > partitions and other tunables. Is there any ongoing effort on that front? > > Thanks, > Bharath > > > *[1]* OOM stack trace and logs > 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID > 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > java.util.Arrays.copyOf(Arrays.java:3326) > > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > java.lang.StringBuilder.append(StringBuilder.java:136) > > scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) > scala.Tuple2.toString(Tuple2.scala:22) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID > 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, > 43704, 0), shuffleId=0, mapId=13, reduceId=92) > 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at > ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 > (groupBy at ModelTrainer.java:133) > 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at > ModelTrainer.java:141) failed in 55.259 s > 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at > ModelTrainer.java:133) and Stage 36 (saveAsTextFile at > ModelTrainer.java:141) due to fetch failure > > > > >
OOM with groupBy + saveAsTextFile
Hi, I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being Iterable>. The job runs on 3 hosts in a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it. While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile fails after repeated retries with each attempt failing due to an out of memory error *[1]*. I understand that a few partitions may be overloaded as a result of the groupBy and I've tried the following config combinations unsuccessfully: 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions and have max cores = 3 so that each key is a "logical" partition (though many partitions will end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number of partitions unspecified (in which case the # input partitions was used, which is 44) Since I do not intend to cache RDD's, I have set spark.storage.memoryFraction=0.2 in both cases. My understanding is that if each host is processing a single logical partition to saveAsTextFile and is reading from other hosts to write out the RDD, it is unlikely that it would run out of memory. My interpretation of the spark tuning guide is that the degree of parallelism has little impact in case (1) above since max cores = number of hosts. Can someone explain why there are still OOM's with 100G being available? On a related note, intuitively (though I haven't read the source), it appears that an entire key-value pair needn't fit into memory of a single host for saveAsTextFile since a single shuffle read from a remote can be written to HDFS before the next remote read is carried out. This way, not all data needs to be collected at the same time. Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the tuning guide and even as per Datastax's spark introduction), there may need to be more documentation around the internals of spark to help users take better informed tuning decisions with parallelism, max cores, number partitions and other tunables. Is there any ongoing effort on that front? Thanks, Bharath *[1]* OOM stack trace and logs 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3326) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) java.lang.StringBuilder.append(StringBuilder.java:136) scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) scala.Tuple2.toString(Tuple2.scala:22) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, 43704, 0), shuffleId=0, mapId=13, reduceId=92) 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 (groupBy at ModelTrainer.java:133) 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at ModelTrainer.java:141) failed in 55.259 s 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at ModelTrainer.java:133) and Stage 36 (saveAsTextFile at ModelTrainer.java:141) due to fetch failure