I've only tried 0.9, in which I ran into the `stdin writer to Python
finished early` so frequently I wasn't able to load even a 1GB file.
Let me know if I can provide any other info!

On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:
> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll 
> try to look into these, seems like a serious error.
>
> Matei
>
> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>
>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>> 1.0.4" from GitHub on 2014-03-18.
>>
>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>> have succeeded.
>>
>> I can get this to work -- with manual interventions -- if I omit
>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>> of the 175 executors hung, and I had to kill the python process to get
>> things going again.  The only indication of this in the logs was `INFO
>> python.PythonRDD: stdin writer to Python finished early`.
>>
>> With batchSize=1 and persist, a new memory error came up in several
>> tasks, before the app was failed:
>>
>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>> thread Thread[stdin writer for python,5,main]
>> java.lang.OutOfMemoryError: Java heap space
>>        at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>        at java.lang.String.<init>(String.java:203)
>>        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>        at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>        at org.apache.hadoop.io.Text.decode(Text.java:350)
>>        at org.apache.hadoop.io.Text.decode(Text.java:327)
>>        at org.apache.hadoop.io.Text.toString(Text.java:254)
>>        at 
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>        at 
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>        at 
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>        at 
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>
>> There are other exceptions, but I think they all stem from the above,
>> eg. org.apache.spark.SparkException: Error sending message to
>> BlockManagerMaster
>>
>> Let me know if there are other settings I should try, or if I should
>> try a newer snapshot.
>>
>> Thanks again!
>>
>>
>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <matei.zaha...@gmail.com> 
>> wrote:
>>> Hey Jim,
>>>
>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it 
>>> group multiple objects together before passing them between Java and 
>>> Python, but this may be too high by default. Try passing batchSize=10 to 
>>> your SparkContext constructor to lower it (the default is 1024). Or even 
>>> batchSize=1 to match earlier versions.
>>>
>>> Matei
>>>
>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>
>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>> too many of these records in memory, when all that is needed is to
>>>> stream through them and count.  Any tips for getting through this
>>>> workload?
>>>>
>>>>
>>>> Code:
>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>
>>>> # the biggest individual text line is ~3MB
>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>> (loads(y), loads(s)))
>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>
>>>> parsed.count()
>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>> all executors
>>>>
>>>> Incidentally the whole app appears to be killed, but this error is not
>>>> propagated to the shell.
>>>>
>>>> Cluster:
>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>
>>>> Exception:
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>       at 
>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>       at 
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>
>

Reply via email to