Hey Jim, In Spark 0.9 we added a “batchSize” parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
Matei On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote: > Hi all, I'm wondering if there's any settings I can use to reduce the > memory needed by the PythonRDD when computing simple stats. I am > getting OutOfMemoryError exceptions while calculating count() on big, > but not absurd, records. It seems like PythonRDD is trying to keep > too many of these records in memory, when all that is needed is to > stream through them and count. Any tips for getting through this > workload? > > > Code: > session = sc.textFile('s3://...json.gz') # ~54GB of compressed data > > # the biggest individual text line is ~3MB > parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s): > (loads(y), loads(s))) > parsed.persist(StorageLevel.MEMORY_AND_DISK) > > parsed.count() > # will never finish: executor.Executor: Uncaught exception will FAIL > all executors > > Incidentally the whole app appears to be killed, but this error is not > propagated to the shell. > > Cluster: > 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB) > > Exception: > java.lang.OutOfMemoryError: Java heap space > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) > at > org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) > at > org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) > at > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)