Thanks - I have pondered that piece of code long and hard trying different combinations of each of those - e.g. setting spark.locality.wait and spark. locality.wait.node very high and the others very low -- nothing. It worked on 0.7 but at that point we were using lots of hbase regions/spark partitions/tasks and spark.spreadout = true (I think that is no longer supported)
The one clue is that it sometimes uses node local sporadically. I would be more than happy to provide tons of logs but there isn't as far as I can see any logging of this part of the code other than many lines all saying something like: 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] Starting task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL) and the fact that the UI shows the RDD not partitioning across the appropriate hbase region nodes. I was thinking this was some sort of DNS short vs full name but changing that didn't seem to do anything. On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <[email protected]> wrote: > Do you also set any of spark.locality.wait.{process,node,rack} ? Those > override spark.locality.wait for specific locality levels. > > private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { > val defaultWait = System.getProperty("spark.locality.wait", "3000") > level match { > case TaskLocality.PROCESS_LOCAL => > System.getProperty("spark.locality.wait.process", > defaultWait).toLong > case TaskLocality.NODE_LOCAL => > System.getProperty("spark.locality.wait.node", defaultWait).toLong > case TaskLocality.RACK_LOCAL => > System.getProperty("spark.locality.wait.rack", defaultWait).toLong > case TaskLocality.ANY => > 0L > } > } > > The other option I'm thinking is maybe these tasks are jumping straight to > TaskLocality.ANY with no locality preference. Do you have any logs you can > share that include this fallback to less-preferred localities? > > Did you have this working properly on 0.7.x ? > > > > On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed <[email protected]>wrote: > >> Hi Andrew - thanks - that's a good thought - unfortunately, I have those >> set in the same pre context creation place as all the other variables that >> I have been using for months quite happily and that seem to impact Spark >> nicely. I have it set to Int.MaxValue.toString which I am guessing is large >> enough. >> >> It very occasionally will use all data local nodes, and sometimes a mix, >> but mostly all process-local... >> >> >> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <[email protected]> wrote: >> >>> Hi Erik, >>> >>> I would guess that if you set spark.locality.wait to an absurdly large >>> value then you would have essentially that effect. >>> >>> Maybe you aren't setting the system property before creating your Spark >>> context? >>> >>> http://spark.incubator.apache.org/docs/latest/configuration.html >>> >>> Andrew >>> >>> >>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed >>> <[email protected]>wrote: >>> >>>> Hi All, >>>> After switching to 0.8, and reducing the number of partitions/tasks for >>>> a large scale computation, I have been unable to force Spark to use only >>>> executors on nodes where hbase data is local. I have not been able to find >>>> a setting for spark.locality.wait that makes any difference. It is not an >>>> option for us to let spark chose non data local nodes. Is their some >>>> example code of how to get this to work the way we want? We have our own >>>> input RDD that mimics the NewHadoopRdd and it seems to be doing the correct >>>> thing in all regards wrt to preferred locations. >>>> >>>> Do I have to write my own compute Tasks and schedule them myself? >>>> >>>> Anyone have any suggestions? I am stumped. >>>> >>>> cheers, >>>> Erik >>>> >>>> >>>> >>> >> >> >> -- >> Erik James Freed >> CoDecision Software >> 510.859.3360 >> [email protected] >> >> 1480 Olympus Avenue >> Berkeley, CA >> 94708 >> >> 179 Maria Lane >> Orcas, WA >> 98245 >> > > -- Erik James Freed CoDecision Software 510.859.3360 [email protected] 1480 Olympus Avenue Berkeley, CA 94708 179 Maria Lane Orcas, WA 98245
