Ah, interesting, thanks for reporting that. Do you mind opening a JIRA issue 
for it? I think the right way would be to wait at least X seconds after start 
before deciding that some blocks don’t have preferred locations available.

Matei

On Dec 1, 2013, at 9:08 AM, Erik Freed <[email protected]> wrote:

> I did check the DNS scenario when I first got started on this problem - have 
> been bit by that more than once setting up Spark on various clusters which 
> had inconsistent DNS setups. That wasn't it though.
> 
> It turns out that  there was a race condition between when the executors were 
> registering and when the Taskset manager thought they were fair game.  If i 
> used the spark context in that period, the RDD was cached in an awkward state.
> 
> By putting in a wait after creating the spark context for all executors to 
> establish themselves, the node local affinity was consistent. 
> 
> I tried to find a way to cleanly wait for the spark context to settle - but 
> could not find a hook for that. I would say that is a feature worth putting 
> in. There is a period after you create a spark context where you actually 
> can't use it - that seems awkward.
> 
> Thanks for the help Andrew - I owe you one!
> 
> cheers,
> Erik
> 
> 
> On Tue, Nov 26, 2013 at 3:59 PM, Andrew Ash <[email protected]> wrote:
> I've seen issues where no task is node local because the hostname on Spark 
> and hadoop is close but not quite the same -- e.g. myhost.andrewash.com vs 
> myhost  One if FQDN the other is not.  Can you confirm that the hostname 
> present in the master's webui (:8080) in the address column of the workers 
> section matches what's listed in the namenode's webui? (:50070)
> 
> Example:
> 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID 75 
> on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242 
> (non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202)
> 
> The line you pasted is PROCESS_LOCAL, which means that the output of one 
> transformation is the input of the next one in the same JVM.  If you were 
> reading out of HBase, I would expect you could only achieve NODE_LOCALITY on 
> the initial data read since the data isn't yet loaded into the Spark process. 
>  Can you run a simple query that doesn't involve any transformation?  I would 
> imagine a simple "select * from table limit 10" or equivalent would do it.
> 
> Andrew
> 
> 
> 
> On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <[email protected]> wrote:
> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that does 
> not seem to be impacting me positively...
> 
> 
> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <[email protected]> wrote:
> Thanks - I have pondered that piece of code long and hard trying different 
> combinations of each of those - e.g. setting spark.locality.wait and 
> spark.locality.wait.node very high and the others very low  -- nothing. It 
> worked on 0.7 but at that point we were using lots of hbase regions/spark 
> partitions/tasks and spark.spreadout = true (I think that is no longer 
> supported)
> 
> The one clue is that it sometimes uses node local sporadically. I would be 
> more than happy to provide tons of logs but there isn't as far as I can see 
> any logging of this part of the code other than many lines all saying 
> something like:
> 
> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] Starting 
> task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL)
> 
> and the fact that the UI shows the RDD not partitioning across the 
> appropriate hbase region nodes. I was thinking this was some sort of DNS 
> short vs full name but changing that didn't seem to do anything.
> 
> 
> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]> wrote:
> Do you also set any of spark.locality.wait.{process,node,rack} ?  Those 
> override spark.locality.wait for specific locality levels.
> 
>   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
>     val defaultWait = System.getProperty("spark.locality.wait", "3000")
>     level match {
>       case TaskLocality.PROCESS_LOCAL =>
>         System.getProperty("spark.locality.wait.process", defaultWait).toLong
>       case TaskLocality.NODE_LOCAL =>
>         System.getProperty("spark.locality.wait.node", defaultWait).toLong
>       case TaskLocality.RACK_LOCAL =>
>         System.getProperty("spark.locality.wait.rack", defaultWait).toLong
>       case TaskLocality.ANY =>
>         0L
>     }
>   }
> 
> The other option I'm thinking is maybe these tasks are jumping straight to 
> TaskLocality.ANY with no locality preference.  Do you have any logs you can 
> share that include this fallback to less-preferred localities?
> 
> Did you have this working properly on 0.7.x ?
> 
> 
> 
> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed <[email protected]> wrote:
> Hi Andrew  - thanks - that's a good thought - unfortunately, I have those set 
> in the same pre context creation place as all the other variables that I have 
> been using for months quite happily and that seem to impact Spark nicely. I 
> have it set to Int.MaxValue.toString which I am guessing is large enough.
> 
> It very occasionally will use all data local nodes, and sometimes a mix, but 
> mostly all process-local...
> 
> 
> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]> wrote:
> Hi Erik,
> 
> I would guess that if you set spark.locality.wait to an absurdly large value 
> then you would have essentially that effect.
> 
> Maybe you aren't setting the system property before creating your Spark 
> context?
> 
> http://spark.incubator.apache.org/docs/latest/configuration.html
> 
> Andrew
> 
> 
> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <[email protected]> wrote:
> Hi All,
> After switching to 0.8, and reducing the number of partitions/tasks for a 
> large scale computation, I have been unable to force Spark to use only 
> executors on nodes where hbase data is local. I have not been able to find a 
> setting for spark.locality.wait that makes any difference. It is not an 
> option for us to let spark chose non data local nodes. Is their some example 
> code of how to get this to work the way we want? We have our own input RDD 
> that mimics the NewHadoopRdd and it seems to be doing the correct thing in 
> all regards wrt to preferred locations.
> 
> Do I have to write my own compute Tasks and schedule them myself?
> 
> Anyone have any suggestions? I am stumped.
> 
> cheers,
> Erik
> 
> 
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> [email protected]
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> [email protected]
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> [email protected]
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> [email protected]
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245

Reply via email to