Re: OutofMemoryError when generating output
Hi, Thanks for the response. I tried to use countByKey. But I am not able to write the output to console or to a file. Neither collect() nor saveAsTextFile() work for the Map object that is generated after countByKey(). valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey() x.saveAsTextFile(...) // does not work. generates an error that saveAstextFile is not defined for Map object Is there a way to convert the Map object to an object that I can output to console and to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847p13056.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError when generating output
Yeah, saveAsTextFile is an RDD specific method. If you really want to use that method, just turn the map into an RDD: `sc.parallelize(x.toSeq).saveAsTextFile(...)` Reading through the api-docs will present you many more alternate solutions! Best, Burak - Original Message - From: SK skrishna...@gmail.com To: u...@spark.incubator.apache.org Sent: Thursday, August 28, 2014 12:45:22 PM Subject: Re: OutofMemoryError when generating output Hi, Thanks for the response. I tried to use countByKey. But I am not able to write the output to console or to a file. Neither collect() nor saveAsTextFile() work for the Map object that is generated after countByKey(). valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey() x.saveAsTextFile(...) // does not work. generates an error that saveAstextFile is not defined for Map object Is there a way to convert the Map object to an object that I can output to console and to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847p13056.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OutofMemoryError when generating output
Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g= (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f = println(f)) user_time.map(f = %s, %s.format(f._1, f._2)).saveAsTextFile(app_output) sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError when generating output
Hi, The error doesn't occur during saveAsTextFile but rather during the groupByKey as far as I can tell. We strongly urge users to not use groupByKey if they don't have to. What I would suggest is the following work-around: sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey() instead Best, Burak - Original Message - From: SK skrishna...@gmail.com To: u...@spark.incubator.apache.org Sent: Tuesday, August 26, 2014 12:38:00 PM Subject: OutofMemoryError when generating output Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g= (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f = println(f)) user_time.map(f = %s, %s.format(f._1, f._2)).saveAsTextFile(app_output) sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org