I've seen issues where no task is node local because the hostname on Spark and hadoop is close but not quite the same -- e.g. myhost.andrewash.com vs myhost One if FQDN the other is not. Can you confirm that the hostname present in the master's webui (:8080) in the address column of the workers section matches what's listed in the namenode's webui? (:50070)
Example: 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID 75 on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242 (non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202) The line you pasted is PROCESS_LOCAL, which means that the output of one transformation is the input of the next one in the same JVM. If you were reading out of HBase, I would expect you could only achieve NODE_LOCALITY on the initial data read since the data isn't yet loaded into the Spark process. Can you run a simple query that doesn't involve any transformation? I would imagine a simple "select * from table limit 10" or equivalent would do it. Andrew On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <[email protected]>wrote: > hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that > does not seem to be impacting me positively... > > > On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <[email protected]>wrote: > >> Thanks - I have pondered that piece of code long and hard trying >> different combinations of each of those - e.g. setting spark. >> locality.wait and spark.locality.wait.node very high and the others very >> low -- nothing. It worked on 0.7 but at that point we were using lots of >> hbase regions/spark partitions/tasks and spark.spreadout = true (I think >> that is no longer supported) >> >> The one clue is that it sometimes uses node local sporadically. I would >> be more than happy to provide tons of logs but there isn't as far as I can >> see any logging of this part of the code other than many lines all saying >> something like: >> >> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] >> Starting task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL) >> >> and the fact that the UI shows the RDD not partitioning across the >> appropriate hbase region nodes. I was thinking this was some sort of DNS >> short vs full name but changing that didn't seem to do anything. >> >> >> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]> wrote: >> >>> Do you also set any of spark.locality.wait.{process,node,rack} ? Those >>> override spark.locality.wait for specific locality levels. >>> >>> private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { >>> val defaultWait = System.getProperty("spark.locality.wait", "3000") >>> level match { >>> case TaskLocality.PROCESS_LOCAL => >>> System.getProperty("spark.locality.wait.process", >>> defaultWait).toLong >>> case TaskLocality.NODE_LOCAL => >>> System.getProperty("spark.locality.wait.node", >>> defaultWait).toLong >>> case TaskLocality.RACK_LOCAL => >>> System.getProperty("spark.locality.wait.rack", >>> defaultWait).toLong >>> case TaskLocality.ANY => >>> 0L >>> } >>> } >>> >>> The other option I'm thinking is maybe these tasks are jumping straight >>> to TaskLocality.ANY with no locality preference. Do you have any logs you >>> can share that include this fallback to less-preferred localities? >>> >>> Did you have this working properly on 0.7.x ? >>> >>> >>> >>> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed >>> <[email protected]>wrote: >>> >>>> Hi Andrew - thanks - that's a good thought - unfortunately, I have >>>> those set in the same pre context creation place as all the other variables >>>> that I have been using for months quite happily and that seem to impact >>>> Spark nicely. I have it set to Int.MaxValue.toString which I am guessing is >>>> large enough. >>>> >>>> It very occasionally will use all data local nodes, and sometimes a >>>> mix, but mostly all process-local... >>>> >>>> >>>> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]>wrote: >>>> >>>>> Hi Erik, >>>>> >>>>> I would guess that if you set spark.locality.wait to an absurdly large >>>>> value then you would have essentially that effect. >>>>> >>>>> Maybe you aren't setting the system property before creating your >>>>> Spark context? >>>>> >>>>> http://spark.incubator.apache.org/docs/latest/configuration.html >>>>> >>>>> Andrew >>>>> >>>>> >>>>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <[email protected] >>>>> > wrote: >>>>> >>>>>> Hi All, >>>>>> After switching to 0.8, and reducing the number of partitions/tasks >>>>>> for a large scale computation, I have been unable to force Spark to use >>>>>> only executors on nodes where hbase data is local. I have not been able >>>>>> to >>>>>> find a setting for spark.locality.wait that makes any difference. It is >>>>>> not >>>>>> an option for us to let spark chose non data local nodes. Is their some >>>>>> example code of how to get this to work the way we want? We have our own >>>>>> input RDD that mimics the NewHadoopRdd and it seems to be doing the >>>>>> correct >>>>>> thing in all regards wrt to preferred locations. >>>>>> >>>>>> Do I have to write my own compute Tasks and schedule them myself? >>>>>> >>>>>> Anyone have any suggestions? I am stumped. >>>>>> >>>>>> cheers, >>>>>> Erik >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Erik James Freed >>>> CoDecision Software >>>> 510.859.3360 >>>> [email protected] >>>> >>>> 1480 Olympus Avenue >>>> Berkeley, CA >>>> 94708 >>>> >>>> 179 Maria Lane >>>> Orcas, WA >>>> 98245 >>>> >>> >>> >> >> >> -- >> Erik James Freed >> CoDecision Software >> 510.859.3360 >> [email protected] >> >> 1480 Olympus Avenue >> Berkeley, CA >> 94708 >> >> 179 Maria Lane >> Orcas, WA >> 98245 >> > > > > -- > Erik James Freed > CoDecision Software > 510.859.3360 > [email protected] > > 1480 Olympus Avenue > Berkeley, CA > 94708 > > 179 Maria Lane > Orcas, WA > 98245 >
