I think the problem I ran into in 0.9 is covered in https://issues.apache.org/jira/browse/SPARK-1323
When I kill the python process, the stacktrace I gets indicates that this happens at initialization. It looks like the initial write to the Python process does not go through, and then the iterator hangs waiting for output. I haven't had luck turning on debugging for the executor process. Still trying to learn the lgo4j properties that need to be set. No luck yet on tracking down the memory leak. 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231) at org.apache.spark.rdd.RDD.iterator(RDD.scala:222) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> wrote: > I've only tried 0.9, in which I ran into the `stdin writer to Python > finished early` so frequently I wasn't able to load even a 1GB file. > Let me know if I can provide any other info! > > On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: >> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? >> We'll try to look into these, seems like a serious error. >> >> Matei >> >> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >> >>> Thanks, Matei. I am running "Spark 1.0.0-SNAPSHOT built for Hadoop >>> 1.0.4" from GitHub on 2014-03-18. >>> >>> I tried batchSizes of 512, 10, and 1 and each got me further but none >>> have succeeded. >>> >>> I can get this to work -- with manual interventions -- if I omit >>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1. 5 >>> of the 175 executors hung, and I had to kill the python process to get >>> things going again. The only indication of this in the logs was `INFO >>> python.PythonRDD: stdin writer to Python finished early`. >>> >>> With batchSize=1 and persist, a new memory error came up in several >>> tasks, before the app was failed: >>> >>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in >>> thread Thread[stdin writer for python,5,main] >>> java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>> at java.lang.String.<init>(String.java:203) >>> at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) >>> at java.nio.CharBuffer.toString(CharBuffer.java:1201) >>> at org.apache.hadoop.io.Text.decode(Text.java:350) >>> at org.apache.hadoop.io.Text.decode(Text.java:327) >>> at org.apache.hadoop.io.Text.toString(Text.java:254) >>> at >>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>> at >>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>> >>> There are other exceptions, but I think they all stem from the above, >>> eg. org.apache.spark.SparkException: Error sending message to >>> BlockManagerMaster >>> >>> Let me know if there are other settings I should try, or if I should >>> try a newer snapshot. >>> >>> Thanks again! >>> >>> >>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <matei.zaha...@gmail.com> >>> wrote: >>>> Hey Jim, >>>> >>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it >>>> group multiple objects together before passing them between Java and >>>> Python, but this may be too high by default. Try passing batchSize=10 to >>>> your SparkContext constructor to lower it (the default is 1024). Or even >>>> batchSize=1 to match earlier versions. >>>> >>>> Matei >>>> >>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>> >>>>> Hi all, I'm wondering if there's any settings I can use to reduce the >>>>> memory needed by the PythonRDD when computing simple stats. I am >>>>> getting OutOfMemoryError exceptions while calculating count() on big, >>>>> but not absurd, records. It seems like PythonRDD is trying to keep >>>>> too many of these records in memory, when all that is needed is to >>>>> stream through them and count. Any tips for getting through this >>>>> workload? >>>>> >>>>> >>>>> Code: >>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data >>>>> >>>>> # the biggest individual text line is ~3MB >>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s): >>>>> (loads(y), loads(s))) >>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK) >>>>> >>>>> parsed.count() >>>>> # will never finish: executor.Executor: Uncaught exception will FAIL >>>>> all executors >>>>> >>>>> Incidentally the whole app appears to be killed, but this error is not >>>>> propagated to the shell. >>>>> >>>>> Cluster: >>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB) >>>>> >>>>> Exception: >>>>> java.lang.OutOfMemoryError: Java heap space >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>> >>