The spark code is on my /home directory, which is shared on NFS to all nodes. So all workers should be able to access the same file.
On Thu, Oct 3, 2013 at 2:34 PM, Mark Hamstra <[email protected]>wrote: > But the worker has to be on a node that has local access to the file. > > > On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <[email protected]> wrote: > >> Ok, even if my understanding of allowLocal is incorrect, nevertheless >> (1) I'm loading a local file >> (2) The tasks seem as if they are getting executed on a slave node >> (ip-10-129-25-28) is not my master node >> ?? >> >> >> >> >> On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <[email protected]>wrote: >> >>> No, that is not what allowLocal means. For a very few actions, the >>> DAGScheduler will run the job locally (in a separate thread on the master >>> node) if the RDD in the action has a single partition and no dependencies >>> in its lineage. If allowLocal is false, that doesn't mean that >>> SparkContext.textFile called on a local file will magically turn that local >>> file into a distributed file and allow more than just the node where the >>> file is local to process that file. >>> >>> >>> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <[email protected]> wrote: >>> >>>> Inlined. >>>> >>>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia >>>> <[email protected]>wrote: >>>> >>>>> Hi Shangyu, >>>>> >>>>> (1) When we read in a local file by SparkContext.textFile and do some >>>>> map/reduce job on it, how will spark decide to send data to which worker >>>>> node? Will the data be divided/partitioned equally according to the number >>>>> of worker node and each worker node get one piece of data? >>>>> >>>>> You actually can't run distributed jobs on local files. The local file >>>>> URL only works on the same machine, or if the file is in a filesystem >>>>> that's mounted on the same path on all worker nodes. >>>>> >>>>> Is this really true? >>>> >>>> scala> val f = sc.textFile("/root/ue/ue_env.sh") >>>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) >>>> called with curMem=34870, maxMem=4081511301 >>>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as >>>> values to memory (estimated size 34.1 KB, free 3.8 GB) >>>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12 >>>> >>>> scala> f.map(l=>l.split(" ")).collect >>>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to >>>> process : 1 >>>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at >>>> <console>:15 >>>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at >>>> <console>:15) with 2 output partitions (*allowLocal=false*) >>>> ... >>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as >>>> TID 4 on executor 0: ip-10-129-25-28 (preferred) >>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as >>>> 1517 bytes in 3 ms >>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as >>>> TID 5 on executor 0: ip-10-129-25-28 (preferred) >>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as >>>> 1517 bytes in 0 ms >>>> >>>> Doesn't allowLocal=false mean the job is getting distributed to workers >>>> rather than computed locally? >>>> >>>> >>>> tks >>>> >>>> >>>>> >>>>> Matei >>>>> >>>>> >>>>> Any help will be appreciated. >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- >>>>> >>>>> Shangyu, Luo >>>>> Department of Computer Science >>>>> Rice University >>>>> >>>>> >>>>> >>>> >>> >> >
