Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg <arp...@spotify.com> wrote:

> If you thinking of the yarn memory overhead, then yes, I have increased
> that as well. However, I'm glad to say that my job finished successfully
> finally. Besides the timeout and memory settings, performing repartitioning
> (with shuffling) at the right time seems to be the key to make this large
> job succeed. With all the transformations in the job, the partition
> distribution was becoming increasingly skewed. Not easy to figure out when
> and to what number of partitions to set, and takes forever to tweak these
> settings since it's works perfectly for small datasets and you'll have to
> experiment with large time-consuming jobs. Imagine if there was an
> automatic partition reconfiguration function that automagically did that...
>
>
> On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> I *think* this may have been related to the default memory overhead
>> setting being too low. I raised the value to 1G it and tried my job again
>> but i had to leave the office before it finished. It did get further but
>> I'm not exactly sure if that's just because i raised the memory. I'll see
>> tomorrow- but i have a suspicion this may have been the cause of the
>> executors being killed by the application master.
>> On Feb 23, 2015 5:25 PM, "Corey Nolet" <cjno...@gmail.com> wrote:
>>
>>> I've got the opposite problem with regards to partitioning. I've got
>>> over 6000 partitions for some of these RDDs which immediately blows the
>>> heap somehow- I'm still not exactly sure how. If I coalesce them down to
>>> about 600-800 partitions, I get the problems where the executors are dying
>>> without any other error messages (other than telling me the executor was
>>> lost in the UI). If I don't coalesce, I pretty immediately get Java heap
>>> space exceptions that kill the job altogether.
>>>
>>> Putting in the timeouts didn't seem to help the case where I am
>>> coalescing. Also, I don't see any dfferences between 'disk only' and
>>> 'memory and disk' storage levels- both of them are having the same
>>> problems. I notice large shuffle files (30-40gb) that only seem to spill a
>>> few hundred mb.
>>>
>>> On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg <arp...@spotify.com>
>>> wrote:
>>>
>>>> Sounds very similar to what I experienced Corey. Something that seems
>>>> to at least help with my problems is to have more partitions. Am already
>>>> fighting between ending up with too many partitions in the end and having
>>>> too few in the beginning. By coalescing at late as possible and avoiding
>>>> too few in the beginning, the problems seems to decrease. Also, increasing
>>>> spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
>>>> significantly (~700 secs), the problems seems to almost disappear. Don't
>>>> wont to celebrate yet, still long way left before the job complete but it's
>>>> looking better...
>>>>
>>>> On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> I'm looking @ my yarn container logs for some of the executors which
>>>>> appear to be failing (with the missing shuffle files). I see exceptions
>>>>> that say "client.TransportClientFactor: Found inactive connection to
>>>>> host/ip:port, closing it."
>>>>>
>>>>> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
>>>>> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
>>>>> connect to host/ip:port"
>>>>>
>>>>> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>>>>>
>>>>> Finally, following the sigterm, I see "FileNotFoundExcception:
>>>>> /hdfs/01/yarn/nm/usercache....../spark-local-uuid/shuffle_5_09_0.data (No
>>>>> such file for directory)"
>>>>>
>>>>> I'm looking @ the nodemanager and application master logs and I see no
>>>>> indications whatsoever that there were any memory issues during this 
>>>>> period
>>>>> of time. The Spark UI is telling me none of the executors are really using
>>>>> too much memory when this happens. It is a big job that's catching several
>>>>> 100's of GB but each node manager on the cluster has 64gb of ram just for
>>>>> yarn containers (physical nodes have 128gb). On this cluster, we have 128
>>>>> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>>>>>
>>>>> Any further ideas on how to track this down? Again, we're able to run
>>>>> this same job on about 1/5th of the data just fine.The only thing that's
>>>>> pointing me towards a memory issue is that it seems to be happening in the
>>>>> same stages each time and when I lower the memory that each executor has
>>>>> allocated it happens in earlier stages but I can't seem to find anything
>>>>> that says an executor (or container for that matter) has run low on 
>>>>> memory.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg <arp...@spotify.com>
>>>>> wrote:
>>>>>
>>>>>> No, unfortunately we're not making use of dynamic allocation or the
>>>>>> external shuffle service. Hoping that we could reconfigure our cluster to
>>>>>> make use of it, but since it requires changes to the cluster itself (and
>>>>>> not just the Spark app), it could take some time.
>>>>>>
>>>>>> Unsure if task 450 was acting as a reducer or not, but seems
>>>>>> possible. Probably due to a crashed executor as you say. Seems like I 
>>>>>> need
>>>>>> to do some more advanced partition tuning to make this job work, as it's
>>>>>> currently rather high number of partitions.
>>>>>>
>>>>>> Thanks for the help so far! It's certainly a frustrating task to
>>>>>> debug when everything's working perfectly on sample data locally and
>>>>>> crashes hard when running on the full dataset on the cluster...
>>>>>>
>>>>>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui <
>>>>>> same...@databricks.com> wrote:
>>>>>>
>>>>>>> Do you guys have dynamic allocation turned on for YARN?
>>>>>>>
>>>>>>> Anders, was Task 450 in your job acting like a Reducer and fetching
>>>>>>> the Map spill output data from a different node?
>>>>>>>
>>>>>>> If a Reducer task can't read the remote data it needs, that could
>>>>>>> cause the stage to fail. Sometimes this forces the previous stage to 
>>>>>>> also
>>>>>>> be re-computed if it's a wide dependency.
>>>>>>>
>>>>>>> But like Petar said, if you turn the external shuffle service on,
>>>>>>> YARN NodeManager process on the slave machines will serve out the map 
>>>>>>> spill
>>>>>>> data, instead of the Executor JVMs (by default unless you turn external
>>>>>>> shuffle on, the Executor JVM itself serves out the shuffle data which
>>>>>>> causes problems if an Executor dies).
>>>>>>>
>>>>>>> Core, how often are Executors crashing in your app? How many
>>>>>>> Executors do you have total? And what is the memory size for each? You 
>>>>>>> can
>>>>>>> change what fraction of the Executor heap will be used for your user 
>>>>>>> code
>>>>>>> vs the shuffle vs RDD caching with the spark.storage.memoryFraction 
>>>>>>> setting.
>>>>>>>
>>>>>>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic <
>>>>>>> petar.zece...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Could you try to turn on the external shuffle service?
>>>>>>>>
>>>>>>>> spark.shuffle.service.enable = true
>>>>>>>>
>>>>>>>>
>>>>>>>> On 21.2.2015. 17:50, Corey Nolet wrote:
>>>>>>>>
>>>>>>>> I'm experiencing the same issue. Upon closer inspection I'm
>>>>>>>> noticing that executors are being lost as well. Thing is, I can't 
>>>>>>>> figure
>>>>>>>> out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over
>>>>>>>> 1.3TB of memory allocated for the application. I was thinking perhaps 
>>>>>>>> it
>>>>>>>> was possible that a single executor was getting a single or a couple 
>>>>>>>> large
>>>>>>>> partitions but shouldn't the disk persistence kick in at that point?
>>>>>>>>
>>>>>>>> On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg <arp...@spotify.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> For large jobs, the following error message is shown that seems to
>>>>>>>>> indicate that shuffle files for some reason are missing. It's a rather
>>>>>>>>> large job with many partitions. If the data size is reduced, the 
>>>>>>>>> problem
>>>>>>>>> disappears. I'm running a build from Spark master post 1.2 (build at
>>>>>>>>> 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
>>>>>>>>> problem?
>>>>>>>>>
>>>>>>>>>  User class threw exception: Job aborted due to stage failure:
>>>>>>>>> Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task
>>>>>>>>> 450.3 in stage 450.1 (TID 167370,
>>>>>>>>> lon4-hadoopslave-b77.lon4.spotify.net):
>>>>>>>>> java.io.FileNotFoundException:
>>>>>>>>> /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
>>>>>>>>> (No such file or directory)
>>>>>>>>>  at java.io.FileOutputStream.open(Native Method)
>>>>>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:221)
>>>>>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:171)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>>>>>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
>>>>>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>>>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>  at
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
>>>>>>>>>  at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>
>>>>>>>>>  at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>
>>>>>>>>>  at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>>  TIA,
>>>>>>>>> Anders
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to