Hey Jim,

In Spark 0.9 we added a “batchSize” parameter to PySpark that makes it group 
multiple objects together before passing them between Java and Python, but this 
may be too high by default. Try passing batchSize=10 to your SparkContext 
constructor to lower it (the default is 1024). Or even batchSize=1 to match 
earlier versions.

Matei

On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote:

> Hi all, I'm wondering if there's any settings I can use to reduce the
> memory needed by the PythonRDD when computing simple stats.  I am
> getting OutOfMemoryError exceptions while calculating count() on big,
> but not absurd, records.  It seems like PythonRDD is trying to keep
> too many of these records in memory, when all that is needed is to
> stream through them and count.  Any tips for getting through this
> workload?
> 
> 
> Code:
> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
> 
> # the biggest individual text line is ~3MB
> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
> (loads(y), loads(s)))
> parsed.persist(StorageLevel.MEMORY_AND_DISK)
> 
> parsed.count()
> # will never finish: executor.Executor: Uncaught exception will FAIL
> all executors
> 
> Incidentally the whole app appears to be killed, but this error is not
> propagated to the shell.
> 
> Cluster:
> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
> 
> Exception:
> java.lang.OutOfMemoryError: Java heap space
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>        at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>        at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

Reply via email to