I've seen issues where no task is node local because the hostname on Spark
and hadoop is close but not quite the same -- e.g. myhost.andrewash.com vs
myhost  One if FQDN the other is not.  Can you confirm that the hostname
present in the master's webui (:8080) in the address column of the workers
section matches what's listed in the namenode's webui? (:50070)

Example:
13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID
75 on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242
(non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202)

The line you pasted is PROCESS_LOCAL, which means that the output of one
transformation is the input of the next one in the same JVM.  If you were
reading out of HBase, I would expect you could only achieve NODE_LOCALITY
on the initial data read since the data isn't yet loaded into the Spark
process.  Can you run a simple query that doesn't involve any
transformation?  I would imagine a simple "select * from table limit 10" or
equivalent would do it.

Andrew



On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <[email protected]>wrote:

> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that
> does not seem to be impacting me positively...
>
>
> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <[email protected]>wrote:
>
>> Thanks - I have pondered that piece of code long and hard trying
>> different combinations of each of those - e.g. setting spark.
>> locality.wait and spark.locality.wait.node very high and the others very
>> low  -- nothing. It worked on 0.7 but at that point we were using lots of
>> hbase regions/spark partitions/tasks and spark.spreadout = true (I think
>> that is no longer supported)
>>
>> The one clue is that it sometimes uses node local sporadically. I would
>> be more than happy to provide tons of logs but there isn't as far as I can
>> see any logging of this part of the code other than many lines all saying
>> something like:
>>
>> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4]
>> Starting task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL)
>>
>> and the fact that the UI shows the RDD not partitioning across the
>> appropriate hbase region nodes. I was thinking this was some sort of DNS
>> short vs full name but changing that didn't seem to do anything.
>>
>>
>> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]> wrote:
>>
>>> Do you also set any of spark.locality.wait.{process,node,rack} ?  Those
>>> override spark.locality.wait for specific locality levels.
>>>
>>>   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
>>>     val defaultWait = System.getProperty("spark.locality.wait", "3000")
>>>     level match {
>>>       case TaskLocality.PROCESS_LOCAL =>
>>>         System.getProperty("spark.locality.wait.process",
>>> defaultWait).toLong
>>>       case TaskLocality.NODE_LOCAL =>
>>>         System.getProperty("spark.locality.wait.node",
>>> defaultWait).toLong
>>>       case TaskLocality.RACK_LOCAL =>
>>>         System.getProperty("spark.locality.wait.rack",
>>> defaultWait).toLong
>>>       case TaskLocality.ANY =>
>>>         0L
>>>     }
>>>   }
>>>
>>> The other option I'm thinking is maybe these tasks are jumping straight
>>> to TaskLocality.ANY with no locality preference.  Do you have any logs you
>>> can share that include this fallback to less-preferred localities?
>>>
>>> Did you have this working properly on 0.7.x ?
>>>
>>>
>>>
>>> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed 
>>> <[email protected]>wrote:
>>>
>>>> Hi Andrew  - thanks - that's a good thought - unfortunately, I have
>>>> those set in the same pre context creation place as all the other variables
>>>> that I have been using for months quite happily and that seem to impact
>>>> Spark nicely. I have it set to Int.MaxValue.toString which I am guessing is
>>>> large enough.
>>>>
>>>> It very occasionally will use all data local nodes, and sometimes a
>>>> mix, but mostly all process-local...
>>>>
>>>>
>>>> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]>wrote:
>>>>
>>>>> Hi Erik,
>>>>>
>>>>> I would guess that if you set spark.locality.wait to an absurdly large
>>>>> value then you would have essentially that effect.
>>>>>
>>>>> Maybe you aren't setting the system property before creating your
>>>>> Spark context?
>>>>>
>>>>> http://spark.incubator.apache.org/docs/latest/configuration.html
>>>>>
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi All,
>>>>>> After switching to 0.8, and reducing the number of partitions/tasks
>>>>>> for a large scale computation, I have been unable to force Spark to use
>>>>>> only executors on nodes where hbase data is local. I have not been able 
>>>>>> to
>>>>>> find a setting for spark.locality.wait that makes any difference. It is 
>>>>>> not
>>>>>> an option for us to let spark chose non data local nodes. Is their some
>>>>>> example code of how to get this to work the way we want? We have our own
>>>>>> input RDD that mimics the NewHadoopRdd and it seems to be doing the 
>>>>>> correct
>>>>>> thing in all regards wrt to preferred locations.
>>>>>>
>>>>>> Do I have to write my own compute Tasks and schedule them myself?
>>>>>>
>>>>>> Anyone have any suggestions? I am stumped.
>>>>>>
>>>>>> cheers,
>>>>>> Erik
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Erik James Freed
>>>> CoDecision Software
>>>> 510.859.3360
>>>> [email protected]
>>>>
>>>> 1480 Olympus Avenue
>>>> Berkeley, CA
>>>> 94708
>>>>
>>>> 179 Maria Lane
>>>> Orcas, WA
>>>> 98245
>>>>
>>>
>>>
>>
>>
>> --
>> Erik James Freed
>> CoDecision Software
>> 510.859.3360
>> [email protected]
>>
>> 1480 Olympus Avenue
>> Berkeley, CA
>> 94708
>>
>> 179 Maria Lane
>> Orcas, WA
>> 98245
>>
>
>
>
> --
> Erik James Freed
> CoDecision Software
> 510.859.3360
> [email protected]
>
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
>
> 179 Maria Lane
> Orcas, WA
> 98245
>

Reply via email to