But the worker has to be on a node that has local access to the file.
On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <[email protected]> wrote: > Ok, even if my understanding of allowLocal is incorrect, nevertheless > (1) I'm loading a local file > (2) The tasks seem as if they are getting executed on a slave node > (ip-10-129-25-28) is not my master node > ?? > > > > > On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <[email protected]>wrote: > >> No, that is not what allowLocal means. For a very few actions, the >> DAGScheduler will run the job locally (in a separate thread on the master >> node) if the RDD in the action has a single partition and no dependencies >> in its lineage. If allowLocal is false, that doesn't mean that >> SparkContext.textFile called on a local file will magically turn that local >> file into a distributed file and allow more than just the node where the >> file is local to process that file. >> >> >> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <[email protected]> wrote: >> >>> Inlined. >>> >>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia >>> <[email protected]>wrote: >>> >>>> Hi Shangyu, >>>> >>>> (1) When we read in a local file by SparkContext.textFile and do some >>>> map/reduce job on it, how will spark decide to send data to which worker >>>> node? Will the data be divided/partitioned equally according to the number >>>> of worker node and each worker node get one piece of data? >>>> >>>> You actually can't run distributed jobs on local files. The local file >>>> URL only works on the same machine, or if the file is in a filesystem >>>> that's mounted on the same path on all worker nodes. >>>> >>>> Is this really true? >>> >>> scala> val f = sc.textFile("/root/ue/ue_env.sh") >>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) >>> called with curMem=34870, maxMem=4081511301 >>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as >>> values to memory (estimated size 34.1 KB, free 3.8 GB) >>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12 >>> >>> scala> f.map(l=>l.split(" ")).collect >>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to >>> process : 1 >>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at >>> <console>:15 >>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at >>> <console>:15) with 2 output partitions (*allowLocal=false*) >>> ... >>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as >>> TID 4 on executor 0: ip-10-129-25-28 (preferred) >>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as >>> 1517 bytes in 3 ms >>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as >>> TID 5 on executor 0: ip-10-129-25-28 (preferred) >>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as >>> 1517 bytes in 0 ms >>> >>> Doesn't allowLocal=false mean the job is getting distributed to workers >>> rather than computed locally? >>> >>> >>> tks >>> >>> >>>> >>>> Matei >>>> >>>> >>>> Any help will be appreciated. >>>> Thanks! >>>> >>>> >>>> >>>> >>>> -- >>>> -- >>>> >>>> Shangyu, Luo >>>> Department of Computer Science >>>> Rice University >>>> >>>> >>>> >>> >> >
