I did check the DNS scenario when I first got started on this problem - have been bit by that more than once setting up Spark on various clusters which had inconsistent DNS setups. That wasn't it though.
It turns out that there was a race condition between when the executors were registering and when the Taskset manager thought they were fair game. If i used the spark context in that period, the RDD was cached in an awkward state. By putting in a wait after creating the spark context for all executors to establish themselves, the node local affinity was consistent. I tried to find a way to cleanly wait for the spark context to settle - but could not find a hook for that. I would say that is a feature worth putting in. There is a period after you create a spark context where you actually can't use it - that seems awkward. Thanks for the help Andrew - I owe you one! cheers, Erik On Tue, Nov 26, 2013 at 3:59 PM, Andrew Ash <[email protected]> wrote: > I've seen issues where no task is node local because the hostname on Spark > and hadoop is close but not quite the same -- e.g. myhost.andrewash.comvs > myhost One if FQDN the other is not. Can you confirm that the hostname > present in the master's webui (:8080) in the address column of the workers > section matches what's listed in the namenode's webui? (:50070) > > Example: > 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID > 75 on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242 > (non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202) > > The line you pasted is PROCESS_LOCAL, which means that the output of one > transformation is the input of the next one in the same JVM. If you were > reading out of HBase, I would expect you could only achieve NODE_LOCALITY > on the initial data read since the data isn't yet loaded into the Spark > process. Can you run a simple query that doesn't involve any > transformation? I would imagine a simple "select * from table limit 10" or > equivalent would do it. > > Andrew > > > > On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <[email protected]>wrote: > >> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that >> does not seem to be impacting me positively... >> >> >> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <[email protected]>wrote: >> >>> Thanks - I have pondered that piece of code long and hard trying >>> different combinations of each of those - e.g. setting spark. >>> locality.wait and spark.locality.wait.node very high and the others >>> very low -- nothing. It worked on 0.7 but at that point we were using lots >>> of hbase regions/spark partitions/tasks and spark.spreadout = true (I think >>> that is no longer supported) >>> >>> The one clue is that it sometimes uses node local sporadically. I would >>> be more than happy to provide tons of logs but there isn't as far as I can >>> see any logging of this part of the code other than many lines all saying >>> something like: >>> >>> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] >>> Starting task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL) >>> >>> and the fact that the UI shows the RDD not partitioning across the >>> appropriate hbase region nodes. I was thinking this was some sort of DNS >>> short vs full name but changing that didn't seem to do anything. >>> >>> >>> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]>wrote: >>> >>>> Do you also set any of spark.locality.wait.{process,node,rack} ? Those >>>> override spark.locality.wait for specific locality levels. >>>> >>>> private def getLocalityWait(level: TaskLocality.TaskLocality): Long = >>>> { >>>> val defaultWait = System.getProperty("spark.locality.wait", "3000") >>>> level match { >>>> case TaskLocality.PROCESS_LOCAL => >>>> System.getProperty("spark.locality.wait.process", >>>> defaultWait).toLong >>>> case TaskLocality.NODE_LOCAL => >>>> System.getProperty("spark.locality.wait.node", >>>> defaultWait).toLong >>>> case TaskLocality.RACK_LOCAL => >>>> System.getProperty("spark.locality.wait.rack", >>>> defaultWait).toLong >>>> case TaskLocality.ANY => >>>> 0L >>>> } >>>> } >>>> >>>> The other option I'm thinking is maybe these tasks are jumping straight >>>> to TaskLocality.ANY with no locality preference. Do you have any logs you >>>> can share that include this fallback to less-preferred localities? >>>> >>>> Did you have this working properly on 0.7.x ? >>>> >>>> >>>> >>>> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed >>>> <[email protected]>wrote: >>>> >>>>> Hi Andrew - thanks - that's a good thought - unfortunately, I have >>>>> those set in the same pre context creation place as all the other >>>>> variables >>>>> that I have been using for months quite happily and that seem to impact >>>>> Spark nicely. I have it set to Int.MaxValue.toString which I am guessing >>>>> is >>>>> large enough. >>>>> >>>>> It very occasionally will use all data local nodes, and sometimes a >>>>> mix, but mostly all process-local... >>>>> >>>>> >>>>> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]>wrote: >>>>> >>>>>> Hi Erik, >>>>>> >>>>>> I would guess that if you set spark.locality.wait to an absurdly >>>>>> large value then you would have essentially that effect. >>>>>> >>>>>> Maybe you aren't setting the system property before creating your >>>>>> Spark context? >>>>>> >>>>>> http://spark.incubator.apache.org/docs/latest/configuration.html >>>>>> >>>>>> Andrew >>>>>> >>>>>> >>>>>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> After switching to 0.8, and reducing the number of partitions/tasks >>>>>>> for a large scale computation, I have been unable to force Spark to use >>>>>>> only executors on nodes where hbase data is local. I have not been able >>>>>>> to >>>>>>> find a setting for spark.locality.wait that makes any difference. It is >>>>>>> not >>>>>>> an option for us to let spark chose non data local nodes. Is their some >>>>>>> example code of how to get this to work the way we want? We have our own >>>>>>> input RDD that mimics the NewHadoopRdd and it seems to be doing the >>>>>>> correct >>>>>>> thing in all regards wrt to preferred locations. >>>>>>> >>>>>>> Do I have to write my own compute Tasks and schedule them myself? >>>>>>> >>>>>>> Anyone have any suggestions? I am stumped. >>>>>>> >>>>>>> cheers, >>>>>>> Erik >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Erik James Freed >>>>> CoDecision Software >>>>> 510.859.3360 >>>>> [email protected] >>>>> >>>>> 1480 Olympus Avenue >>>>> Berkeley, CA >>>>> 94708 >>>>> >>>>> 179 Maria Lane >>>>> Orcas, WA >>>>> 98245 >>>>> >>>> >>>> >>> >>> >>> -- >>> Erik James Freed >>> CoDecision Software >>> 510.859.3360 >>> [email protected] >>> >>> 1480 Olympus Avenue >>> Berkeley, CA >>> 94708 >>> >>> 179 Maria Lane >>> Orcas, WA >>> 98245 >>> >> >> >> >> -- >> Erik James Freed >> CoDecision Software >> 510.859.3360 >> [email protected] >> >> 1480 Olympus Avenue >> Berkeley, CA >> 94708 >> >> 179 Maria Lane >> Orcas, WA >> 98245 >> > > -- Erik James Freed CoDecision Software 510.859.3360 [email protected] 1480 Olympus Avenue Berkeley, CA 94708 179 Maria Lane Orcas, WA 98245
