Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.

I noticed that the S3 exception seem to occur more frequently when the
box is swapping.  Why is the box swapping?  combineByKey seems to make
the assumption that it can fit an entire partition in memory when
doing the combineLocally step.  I'm going to try to break this apart
but will need some sort of heuristic options include looking at memory
usage via the resource module and trying to keep below
'spark.executor.memory', or using batchSize to limit the number of
entries in the dictionary.  Let me know if you have any opinions.

On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilike...@gmail.com> wrote:
> I'd just like to update this thread by pointing to the PR based on our
> initial design: https://github.com/apache/spark/pull/640
>
> This solution is a little more general and avoids catching IOException
> altogether. Long live exception propagation!
>
>
> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> wrote:
>>
>> Hey Jim,
>>
>> This IOException thing is a general issue that we need to fix and your
>> observation is spot-in. There is actually a JIRA for it here I created a few
>> days ago:
>> https://issues.apache.org/jira/browse/SPARK-1579
>>
>> Aaron is assigned on that one but not actively working on it, so we'd
>> welcome a PR from you on this if you are interested.
>>
>> The first thought we had was to set a volatile flag when the reader sees
>> an exception (indicating there was a failure in the task) and avoid
>> swallowing the IOException in the writer if this happens. But I think there
>> is a race here where the writer sees the error first before the reader knows
>> what is going on.
>>
>> Anyways maybe if you have a simpler solution you could sketch it out in
>> the JIRA and we could talk over there. The current proposal in the JIRA is
>> somewhat complicated...
>>
>> - Patrick
>>
>>
>>
>>
>>
>>
>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>
>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>> caused by a break in the connection to S3, from which the data was being
>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>> catching can potentially mask an exception for the data source, and that is
>>> indeed what I see happening.  The underlying libraries (jets3t and
>>> httpclient) do have retry capabilities, but I don't see a great way of
>>> setting them through Spark code.  Instead I added the patch below which
>>> kills the worker on the exception.  This allows me to completely load the
>>> data source after a few worker retries.
>>>
>>> Unfortunately, java.net.SocketException is the same error that is
>>> sometimes expected from the client when using methods like take().  One
>>> approach around this conflation is to create a new locally scoped exception
>>> class, eg. WriterException, catch java.net.SocketException during output
>>> writing, then re-throw the new exception.  The worker thread could then
>>> distinguish between the reasons java.net.SocketException might be thrown.
>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>
>>> Let me know if I should open a ticket or discuss this on the developers
>>> list instead.  Best,
>>>
>>> Jim
>>>
>>> diff --git
>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> index 0d71fdb..f31158c 100644
>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>              readerException = e
>>>              Try(worker.shutdownOutput()) // kill Python worker process
>>>
>>> +          case e: java.net.SocketException =>
>>> +           // This can happen if a connection to the datasource, eg S3,
>>> resets
>>> +           // or is otherwise broken
>>> +            readerException = e
>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>> +
>>>            case e: IOException =>
>>>              // This can happen for legitimate reasons if the Python code
>>> stops returning data
>>>              // before we are done passing elements through, e.g., for
>>> take(). Just log a message to
>>>
>>>
>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>
>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>> 343)
>>>>
>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com>
>>>> wrote:
>>>> > Okay, thanks. Do you have any info on how large your records and data
>>>> > file are? I'd like to reproduce and fix this.
>>>> >
>>>> > Matei
>>>> >
>>>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>> >
>>>> >> Hi Matei, thanks for working with me to find these issues.
>>>> >>
>>>> >> To summarize, the issues I've seen are:
>>>> >> 0.9.0:
>>>> >> - https://issues.apache.org/jira/browse/SPARK-1323
>>>> >>
>>>> >> SNAPSHOT 2014-03-18:
>>>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>> >> Java heap space.  To me this indicates a memory leak since Spark
>>>> >> should simply be counting records of size < 3MB
>>>> >> - Without persist(), "stdin writer to Python finished early" hangs
>>>> >> the
>>>> >> application, unknown root cause
>>>> >>
>>>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>> >> problem:
>>>> >>
>>>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>> >> early
>>>> >> java.net.SocketException: Connection reset
>>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>> >>        at
>>>> >> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>> >>        at
>>>> >> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>> >>        at
>>>> >> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>> >>        at
>>>> >> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>> >>        at
>>>> >> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>> >>        at
>>>> >> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>> >>        at
>>>> >> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>> >>        at
>>>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
>>>> >>        at
>>>> >> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>> >>        at
>>>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>> >>        at
>>>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>> >>        at
>>>> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>> >>        at
>>>> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>> >>        at
>>>> >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>> >>        at
>>>> >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>> >>        at
>>>> >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> >>        at
>>>> >> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>        at
>>>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>> >>        at
>>>> >> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>> >>        at
>>>> >> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>
>>>> >>
>>>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>> >> <matei.zaha...@gmail.com> wrote:
>>>> >>> Cool, thanks for the update. Have you tried running a branch with
>>>> >>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>> >>> memory leak issue are you referring to, is it separate from this? 
>>>> >>> (Couldn't
>>>> >>> find it earlier in the thread.)
>>>> >>>
>>>> >>> To turn on debug logging, copy conf/log4j.properties.template to
>>>> >>> conf/log4j.properties and change the line log4j.rootCategory=INFO, 
>>>> >>> console
>>>> >>> to log4j.rootCategory=DEBUG, console. Then make sure this file is 
>>>> >>> present in
>>>> >>> "conf" on all workers.
>>>> >>>
>>>> >>> BTW I've managed to run PySpark with this fix on some reasonably
>>>> >>> large S3 data (multiple GB) and it was fine. It might happen only if 
>>>> >>> records
>>>> >>> are large, or something like that. How much heap are you giving to your
>>>> >>> executors, and does it show that much in the web UI?
>>>> >>>
>>>> >>> Matei
>>>> >>>
>>>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>> >>>
>>>> >>>> I think the problem I ran into in 0.9 is covered in
>>>> >>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>> >>>>
>>>> >>>> When I kill the python process, the stacktrace I gets indicates
>>>> >>>> that
>>>> >>>> this happens at initialization.  It looks like the initial write to
>>>> >>>> the Python process does not go through, and then the iterator hangs
>>>> >>>> waiting for output.  I haven't had luck turning on debugging for
>>>> >>>> the
>>>> >>>> executor process.  Still trying to learn the lgo4j properties that
>>>> >>>> need to be set.
>>>> >>>>
>>>> >>>> No luck yet on tracking down the memory leak.
>>>> >>>>
>>>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>> >>>> (crashed)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>> >>>>       at
>>>> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>> >>>>       at
>>>> >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>> >>>>       at
>>>> >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>> >>>>       at java.security.AccessController.doPrivileged(Native Method)
>>>> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>>> >>>>       at
>>>> >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>> >>>>       at
>>>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>> >>>>       at
>>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> >>>>      at
>>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> >>>>       at java.lang.Thread.run(Thread.java:724)
>>>> >>>>
>>>> >>>>
>>>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com>
>>>> >>>> wrote:
>>>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>> >>>>> Python
>>>> >>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>> >>>>> file.
>>>> >>>>> Let me know if I can provide any other info!
>>>> >>>>>
>>>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>> >>>>> <matei.zaha...@gmail.com> wrote:
>>>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>> >>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>> >>>>>>
>>>> >>>>>> Matei
>>>> >>>>>>
>>>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com>
>>>> >>>>>> wrote:
>>>> >>>>>>
>>>> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>> >>>>>>> Hadoop
>>>> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>> >>>>>>>
>>>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>> >>>>>>> none
>>>> >>>>>>> have succeeded.
>>>> >>>>>>>
>>>> >>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>> >>>>>>> batchSize=1.  5
>>>> >>>>>>> of the 175 executors hung, and I had to kill the python process
>>>> >>>>>>> to get
>>>> >>>>>>> things going again.  The only indication of this in the logs was
>>>> >>>>>>> `INFO
>>>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>> >>>>>>>
>>>> >>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>> >>>>>>> several
>>>> >>>>>>> tasks, before the app was failed:
>>>> >>>>>>>
>>>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>> >>>>>>> thread Thread[stdin writer for python,5,main]
>>>> >>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>> >>>>>>>      at java.lang.String.<init>(String.java:203)
>>>> >>>>>>>      at
>>>> >>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>>>>>>
>>>> >>>>>>> There are other exceptions, but I think they all stem from the
>>>> >>>>>>> above,
>>>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>> >>>>>>> BlockManagerMaster
>>>> >>>>>>>
>>>> >>>>>>> Let me know if there are other settings I should try, or if I
>>>> >>>>>>> should
>>>> >>>>>>> try a newer snapshot.
>>>> >>>>>>>
>>>> >>>>>>> Thanks again!
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>> >>>>>>> <matei.zaha...@gmail.com> wrote:
>>>> >>>>>>>> Hey Jim,
>>>> >>>>>>>>
>>>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>> >>>>>>>> makes it group multiple objects together before passing them 
>>>> >>>>>>>> between Java
>>>> >>>>>>>> and Python, but this may be too high by default. Try passing 
>>>> >>>>>>>> batchSize=10 to
>>>> >>>>>>>> your SparkContext constructor to lower it (the default is 1024). 
>>>> >>>>>>>> Or even
>>>> >>>>>>>> batchSize=1 to match earlier versions.
>>>> >>>>>>>>
>>>> >>>>>>>> Matei
>>>> >>>>>>>>
>>>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com>
>>>> >>>>>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>> >>>>>>>>> reduce the
>>>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>> >>>>>>>>> am
>>>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>> >>>>>>>>> on big,
>>>> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>> >>>>>>>>> keep
>>>> >>>>>>>>> too many of these records in memory, when all that is needed
>>>> >>>>>>>>> is to
>>>> >>>>>>>>> stream through them and count.  Any tips for getting through
>>>> >>>>>>>>> this
>>>> >>>>>>>>> workload?
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Code:
>>>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>> >>>>>>>>> data
>>>> >>>>>>>>>
>>>> >>>>>>>>> # the biggest individual text line is ~3MB
>>>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>> >>>>>>>>> (y,s):
>>>> >>>>>>>>> (loads(y), loads(s)))
>>>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>> >>>>>>>>>
>>>> >>>>>>>>> parsed.count()
>>>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>> >>>>>>>>> will FAIL
>>>> >>>>>>>>> all executors
>>>> >>>>>>>>>
>>>> >>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>> >>>>>>>>> error is not
>>>> >>>>>>>>> propagated to the shell.
>>>> >>>>>>>>>
>>>> >>>>>>>>> Cluster:
>>>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>> >>>>>>>>> spark.executor.memory=10GB)
>>>> >>>>>>>>>
>>>> >>>>>>>>> Exception:
>>>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>>>>>>>
>>>> >>>>>>
>>>> >>>
>>>> >
>>>
>>>
>>
>

Reply via email to