Ah, interesting, thanks for reporting that. Do you mind opening a JIRA issue for it? I think the right way would be to wait at least X seconds after start before deciding that some blocks don’t have preferred locations available.
Matei On Dec 1, 2013, at 9:08 AM, Erik Freed <[email protected]> wrote: > I did check the DNS scenario when I first got started on this problem - have > been bit by that more than once setting up Spark on various clusters which > had inconsistent DNS setups. That wasn't it though. > > It turns out that there was a race condition between when the executors were > registering and when the Taskset manager thought they were fair game. If i > used the spark context in that period, the RDD was cached in an awkward state. > > By putting in a wait after creating the spark context for all executors to > establish themselves, the node local affinity was consistent. > > I tried to find a way to cleanly wait for the spark context to settle - but > could not find a hook for that. I would say that is a feature worth putting > in. There is a period after you create a spark context where you actually > can't use it - that seems awkward. > > Thanks for the help Andrew - I owe you one! > > cheers, > Erik > > > On Tue, Nov 26, 2013 at 3:59 PM, Andrew Ash <[email protected]> wrote: > I've seen issues where no task is node local because the hostname on Spark > and hadoop is close but not quite the same -- e.g. myhost.andrewash.com vs > myhost One if FQDN the other is not. Can you confirm that the hostname > present in the master's webui (:8080) in the address column of the workers > section matches what's listed in the namenode's webui? (:50070) > > Example: > 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID 75 > on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242 > (non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202) > > The line you pasted is PROCESS_LOCAL, which means that the output of one > transformation is the input of the next one in the same JVM. If you were > reading out of HBase, I would expect you could only achieve NODE_LOCALITY on > the initial data read since the data isn't yet loaded into the Spark process. > Can you run a simple query that doesn't involve any transformation? I would > imagine a simple "select * from table limit 10" or equivalent would do it. > > Andrew > > > > On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <[email protected]> wrote: > hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that does > not seem to be impacting me positively... > > > On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <[email protected]> wrote: > Thanks - I have pondered that piece of code long and hard trying different > combinations of each of those - e.g. setting spark.locality.wait and > spark.locality.wait.node very high and the others very low -- nothing. It > worked on 0.7 but at that point we were using lots of hbase regions/spark > partitions/tasks and spark.spreadout = true (I think that is no longer > supported) > > The one clue is that it sometimes uses node local sporadically. I would be > more than happy to provide tons of logs but there isn't as far as I can see > any logging of this part of the code other than many lines all saying > something like: > > 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] Starting > task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL) > > and the fact that the UI shows the RDD not partitioning across the > appropriate hbase region nodes. I was thinking this was some sort of DNS > short vs full name but changing that didn't seem to do anything. > > > On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]> wrote: > Do you also set any of spark.locality.wait.{process,node,rack} ? Those > override spark.locality.wait for specific locality levels. > > private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { > val defaultWait = System.getProperty("spark.locality.wait", "3000") > level match { > case TaskLocality.PROCESS_LOCAL => > System.getProperty("spark.locality.wait.process", defaultWait).toLong > case TaskLocality.NODE_LOCAL => > System.getProperty("spark.locality.wait.node", defaultWait).toLong > case TaskLocality.RACK_LOCAL => > System.getProperty("spark.locality.wait.rack", defaultWait).toLong > case TaskLocality.ANY => > 0L > } > } > > The other option I'm thinking is maybe these tasks are jumping straight to > TaskLocality.ANY with no locality preference. Do you have any logs you can > share that include this fallback to less-preferred localities? > > Did you have this working properly on 0.7.x ? > > > > On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed <[email protected]> wrote: > Hi Andrew - thanks - that's a good thought - unfortunately, I have those set > in the same pre context creation place as all the other variables that I have > been using for months quite happily and that seem to impact Spark nicely. I > have it set to Int.MaxValue.toString which I am guessing is large enough. > > It very occasionally will use all data local nodes, and sometimes a mix, but > mostly all process-local... > > > On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]> wrote: > Hi Erik, > > I would guess that if you set spark.locality.wait to an absurdly large value > then you would have essentially that effect. > > Maybe you aren't setting the system property before creating your Spark > context? > > http://spark.incubator.apache.org/docs/latest/configuration.html > > Andrew > > > On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <[email protected]> wrote: > Hi All, > After switching to 0.8, and reducing the number of partitions/tasks for a > large scale computation, I have been unable to force Spark to use only > executors on nodes where hbase data is local. I have not been able to find a > setting for spark.locality.wait that makes any difference. It is not an > option for us to let spark chose non data local nodes. Is their some example > code of how to get this to work the way we want? We have our own input RDD > that mimics the NewHadoopRdd and it seems to be doing the correct thing in > all regards wrt to preferred locations. > > Do I have to write my own compute Tasks and schedule them myself? > > Anyone have any suggestions? I am stumped. > > cheers, > Erik > > > > > > > -- > Erik James Freed > CoDecision Software > 510.859.3360 > [email protected] > > 1480 Olympus Avenue > Berkeley, CA > 94708 > > 179 Maria Lane > Orcas, WA > 98245 > > > > > -- > Erik James Freed > CoDecision Software > 510.859.3360 > [email protected] > > 1480 Olympus Avenue > Berkeley, CA > 94708 > > 179 Maria Lane > Orcas, WA > 98245 > > > > -- > Erik James Freed > CoDecision Software > 510.859.3360 > [email protected] > > 1480 Olympus Avenue > Berkeley, CA > 94708 > > 179 Maria Lane > Orcas, WA > 98245 > > > > > -- > Erik James Freed > CoDecision Software > 510.859.3360 > [email protected] > > 1480 Olympus Avenue > Berkeley, CA > 94708 > > 179 Maria Lane > Orcas, WA > 98245
