Re: OOM with groupBy + saveAsTextFile

2014-11-03 Thread Bharath Ravi Kumar
The result was no different with saveAsHadoopFile. In both cases, I can see
that I've misinterpreted the API docs. I'll explore the API's a bit further
for ways to save the iterable as chunks rather than one large text/binary.
It might also help to clarify this aspect in the API docs. For those (like
me) whose first practical experience with data processing is through spark,
having skipped the Hadoop MR ecosystem, it might help to clarify
interactions with HDFS and the likes. Thanks for all the help.

On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote:

 saveAsText means save every element of the RDD as one line of text.
 It works like TextOutputFormat in Hadoop MapReduce since that's what
 it uses. So you are causing it to create one big string out of each
 Iterable this way.

 On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. This is what I initially suspected, and hence
 asked
  why the library needed to construct the entire value buffer on a single
 host
  before writing it out. The stacktrace appeared to suggest that user code
 is
  not constructing the large buffer. I'm simply calling groupBy and
 saveAsText
  on the resulting grouped rdd. The value after grouping is an
  IterableTuple4String, Double, String, String. None of the strings are
  large. I also do not need a single large string created out of the
 Iterable
  for writing to disk. Instead, I expect the iterable to get written out in
  chunks in response to saveAsText. This shouldn't be the default
 behaviour of
  saveAsText perhaps? Hence my original question of the behavior of
  saveAsText. The tuning / partitioning attempts were aimed at reducing
 memory
  pressure so that multiple such buffers aren't constructed at the same
 time
  on a host. I'll take a second look at the data and code before updating
 this
  thread. Thanks.
 



Re: OOM with groupBy + saveAsTextFile

2014-11-03 Thread Bharath Ravi Kumar
I also realized from your description of saveAsText that the API is indeed
behaving as expected i.e. it is appropriate (though not optimal) for the
API to construct a single string out of the value. If the value turns out
to be large, the user of the API needs to reconsider the implementation
approach. My bad.

On Mon, Nov 3, 2014 at 3:38 PM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 The result was no different with saveAsHadoopFile. In both cases, I can
 see that I've misinterpreted the API docs. I'll explore the API's a bit
 further for ways to save the iterable as chunks rather than one large
 text/binary. It might also help to clarify this aspect in the API docs. For
 those (like me) whose first practical experience with data processing is
 through spark, having skipped the Hadoop MR ecosystem, it might help to
 clarify interactions with HDFS and the likes. Thanks for all the help.

 On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote:

 saveAsText means save every element of the RDD as one line of text.
 It works like TextOutputFormat in Hadoop MapReduce since that's what
 it uses. So you are causing it to create one big string out of each
 Iterable this way.

 On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. This is what I initially suspected, and hence
 asked
  why the library needed to construct the entire value buffer on a single
 host
  before writing it out. The stacktrace appeared to suggest that user
 code is
  not constructing the large buffer. I'm simply calling groupBy and
 saveAsText
  on the resulting grouped rdd. The value after grouping is an
  IterableTuple4String, Double, String, String. None of the strings
 are
  large. I also do not need a single large string created out of the
 Iterable
  for writing to disk. Instead, I expect the iterable to get written out
 in
  chunks in response to saveAsText. This shouldn't be the default
 behaviour of
  saveAsText perhaps? Hence my original question of the behavior of
  saveAsText. The tuning / partitioning attempts were aimed at reducing
 memory
  pressure so that multiple such buffers aren't constructed at the same
 time
  on a host. I'll take a second look at the data and code before updating
 this
  thread. Thanks.
 





Re: OOM with groupBy + saveAsTextFile

2014-11-03 Thread Sean Owen
Yes, that's the same thing really. You're still writing a huge value
as part of one single (key,value) record. The value exists in memory
in order to be written to storage. Although there aren't hard limits,
in general, keys and values aren't intended to be huge, like, hundreds
of megabytes.

You should probably design this differently, to not try to collect a
massive value per key. That is a generally good idea, not just for
this reason.

Certainly, you don't have to be able to fit many (key,value) in memory
at once. One, yes.

On Mon, Nov 3, 2014 at 10:08 AM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 The result was no different with saveAsHadoopFile. In both cases, I can see
 that I've misinterpreted the API docs. I'll explore the API's a bit further
 for ways to save the iterable as chunks rather than one large text/binary.
 It might also help to clarify this aspect in the API docs. For those (like
 me) whose first practical experience with data processing is through spark,
 having skipped the Hadoop MR ecosystem, it might help to clarify
 interactions with HDFS and the likes. Thanks for all the help.

 On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote:

 saveAsText means save every element of the RDD as one line of text.
 It works like TextOutputFormat in Hadoop MapReduce since that's what
 it uses. So you are causing it to create one big string out of each
 Iterable this way.

 On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Thanks for responding. This is what I initially suspected, and hence
  asked
  why the library needed to construct the entire value buffer on a single
  host
  before writing it out. The stacktrace appeared to suggest that user code
  is
  not constructing the large buffer. I'm simply calling groupBy and
  saveAsText
  on the resulting grouped rdd. The value after grouping is an
  IterableTuple4String, Double, String, String. None of the strings
  are
  large. I also do not need a single large string created out of the
  Iterable
  for writing to disk. Instead, I expect the iterable to get written out
  in
  chunks in response to saveAsText. This shouldn't be the default
  behaviour of
  saveAsText perhaps? Hence my original question of the behavior of
  saveAsText. The tuning / partitioning attempts were aimed at reducing
  memory
  pressure so that multiple such buffers aren't constructed at the same
  time
  on a host. I'll take a second look at the data and code before updating
  this
  thread. Thanks.
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM with groupBy + saveAsTextFile

2014-11-02 Thread Reynold Xin
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:


 Hi,

 FYI as follows.  Could you post your heap size settings as well your Spark
 app code?

 Regards
 Arthur

 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
 message Requested array size exceeds VM limit indicates that the
 application (or APIs used by that application) attempted to allocate an
 array that is larger than the heap size. For example, if an application
 attempts to allocate an array of 512MB but the maximum heap size is 256MB
 then OutOfMemoryError will be thrown with the reason Requested array size
 exceeds VM limit. In most cases the problem is either a configuration
 issue (heap size too small), or a bug that results in an application
 attempting to create a huge array, for example, when the number of elements
 in the array are computed using an algorithm that computes an incorrect
 size.”




 On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
 sort use case in a framework that is leading in sorting bench marks? Or is
 there something fundamentally wrong in the usage?
 On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 

Re: OOM with groupBy + saveAsTextFile

2014-11-02 Thread Bharath Ravi Kumar
Thanks for responding. This is what I initially suspected, and hence asked
why the library needed to construct the entire value buffer on a single
host before writing it out. The stacktrace appeared to suggest that user
code is not constructing the large buffer. I'm simply calling groupBy and
saveAsText on the resulting grouped rdd. The value after grouping is an
IterableTuple4String, Double, String, String. None of the strings are
large. I also do not need a single large string created out of the Iterable
for writing to disk. Instead, I expect the iterable to get written out in
chunks in response to saveAsText. This shouldn't be the default behaviour
of saveAsText perhaps? Hence my original question of the behavior of
saveAsText. The tuning / partitioning attempts were aimed at reducing
memory pressure so that multiple such buffers aren't constructed at the
same time on a host. I'll take a second look at the data and code before
updating this thread. Thanks.
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:


 Hi,

 FYI as follows.  Could you post your heap size settings as well your Spark
 app code?

 Regards
 Arthur

 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
 message Requested array size exceeds VM limit indicates that the
 application (or APIs used by that application) attempted to allocate an
 array that is larger than the heap size. For example, if an application
 attempts to allocate an array of 512MB but the maximum heap size is 256MB
 then OutOfMemoryError will be thrown with the reason Requested array size
 exceeds VM limit. In most cases the problem is either a configuration
 issue (heap size too small), or a bug that results in an application
 attempting to create a huge array, for example, when the number of elements
 in the array are computed using an algorithm that computes an incorrect
 size.”




 On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
 sort use case in a framework that is leading in sorting bench marks? Or is
 there something fundamentally wrong in the usage?
 On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be 

Re: OOM with groupBy + saveAsTextFile

2014-11-02 Thread Sean Owen
saveAsText means save every element of the RDD as one line of text.
It works like TextOutputFormat in Hadoop MapReduce since that's what
it uses. So you are causing it to create one big string out of each
Iterable this way.

On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Thanks for responding. This is what I initially suspected, and hence asked
 why the library needed to construct the entire value buffer on a single host
 before writing it out. The stacktrace appeared to suggest that user code is
 not constructing the large buffer. I'm simply calling groupBy and saveAsText
 on the resulting grouped rdd. The value after grouping is an
 IterableTuple4String, Double, String, String. None of the strings are
 large. I also do not need a single large string created out of the Iterable
 for writing to disk. Instead, I expect the iterable to get written out in
 chunks in response to saveAsText. This shouldn't be the default behaviour of
 saveAsText perhaps? Hence my original question of the behavior of
 saveAsText. The tuning / partitioning attempts were aimed at reducing memory
 pressure so that multiple such buffers aren't constructed at the same time
 on a host. I'll take a second look at the data and code before updating this
 thread. Thanks.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit.

On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar reachb...@gmail.com
wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 java.lang.StringBuilder.append(StringBuilder.java:136)

 scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
 scala.Tuple2.toString(Tuple2.scala:22)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
 43704, 0), shuffleId=0, mapId=13, reduceId=92)
 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
 (groupBy at ModelTrainer.java:133)
 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) failed in 55.259 s
 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
 ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) due to fetch failure







Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
sort use case in a framework that is leading in sorting bench marks? Or is
there something fundamentally wrong in the usage?
On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 java.lang.StringBuilder.append(StringBuilder.java:136)

 scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
 scala.Tuple2.toString(Tuple2.scala:22)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
 43704, 0), shuffleId=0, mapId=13, reduceId=92)
 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
 (groupBy at ModelTrainer.java:133)
 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) failed in 55.259 s
 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
 ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
 ModelTrainer.java:141) due to fetch failure







Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread arthur.hk.c...@gmail.com

Hi,

FYI as follows.  Could you post your heap size settings as well your Spark app 
code?

Regards
Arthur

3.1.3 Detail Message: Requested array size exceeds VM limit

The detail message Requested array size exceeds VM limit indicates that the 
application (or APIs used by that application) attempted to allocate an array 
that is larger than the heap size. For example, if an application attempts to 
allocate an array of 512MB but the maximum heap size is 256MB then 
OutOfMemoryError will be thrown with the reason Requested array size exceeds VM 
limit. In most cases the problem is either a configuration issue (heap size too 
small), or a bug that results in an application attempting to create a huge 
array, for example, when the number of elements in the array are computed using 
an algorithm that computes an incorrect size.”




On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort 
 use case in a framework that is leading in sorting bench marks? Or is there 
 something fundamentally wrong in the usage?
 
 On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Hi,
 
 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of 
 count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 
 1061 keys with values being IterableTuple4String, Integer, Double, 
 String. The job runs on 3 hosts in a standalone setup with each host's 
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy 
 stage completes successfully with ~24GB of shuffle write, the saveAsTextFile 
 fails after repeated retries with each attempt failing due to an out of 
 memory error [1]. I understand that a few partitions may be overloaded as a 
 result of the groupBy and I've tried the following config combinations 
 unsuccessfully:
 
 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 
 1061 paritions and have max cores = 3 so that each key is a logical 
 partition (though many partitions will end up on very few hosts), and each 
 host likely runs saveAsTextFile on a single key at a time due to max cores = 
 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
 
 2) Leave max cores unspecified, set the level of parallelism to 72, and leave 
 number of partitions unspecified (in which case the # input partitions was 
 used, which is 44)
 Since I do not intend to cache RDD's, I have set 
 spark.storage.memoryFraction=0.2 in both cases.
 
 My understanding is that if each host is processing a single logical 
 partition to saveAsTextFile and is reading from other hosts to write out the 
 RDD, it is unlikely that it would run out of memory. My interpretation of the 
 spark tuning guide is that the degree of parallelism has little impact in 
 case (1) above since max cores = number of hosts. Can someone explain why 
 there are still OOM's with 100G being available? On a related note, 
 intuitively (though I haven't read the source), it appears that an entire 
 key-value pair needn't fit into memory of a single host for saveAsTextFile 
 since a single shuffle read from a remote can be written to HDFS before the 
 next remote read is carried out. This way, not all data needs to be collected 
 at the same time. 
 
 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the 
 tuning guide and even as per Datastax's spark introduction), there may need 
 to be more documentation around the internals of spark to help users take 
 better informed tuning decisions with parallelism, max cores, number 
 partitions and other tunables. Is there any ongoing effort on that front?
 
 Thanks,
 Bharath
 
 
 [1] OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size 
 exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)
 
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 java.lang.StringBuilder.append(StringBuilder.java:136)
 scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
 scala.Tuple2.toString(Tuple2.scala:22)
 
 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 
 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)