But the worker has to be on a node that has local access to the file.

On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <[email protected]> wrote:

> Ok, even if my understanding of allowLocal is incorrect, nevertheless
> (1) I'm loading a local file
> (2) The tasks seem as if they are getting executed on a slave node
> (ip-10-129-25-28) is not my master node
> ??
>
>
>
>
> On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <[email protected]>wrote:
>
>> No, that is not what allowLocal means.  For a very few actions, the
>> DAGScheduler will run the job locally (in a separate thread on the master
>> node) if the RDD in the action has a single partition and no dependencies
>> in its lineage.  If allowLocal is false, that doesn't mean that
>> SparkContext.textFile called on a local file will magically turn that local
>> file into a distributed file and allow more than just the node where the
>> file is local to process that file.
>>
>>
>> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <[email protected]> wrote:
>>
>>> Inlined.
>>>
>>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia 
>>> <[email protected]>wrote:
>>>
>>>> Hi Shangyu,
>>>>
>>>> (1) When we read in a local file by SparkContext.textFile and do some
>>>> map/reduce job on it, how will spark decide to send data to which worker
>>>> node? Will the data be divided/partitioned equally according to the number
>>>> of worker node and each worker node get one piece of data?
>>>>
>>>> You actually can't run distributed jobs on local files. The local file
>>>> URL only works on the same machine, or if the file is in a filesystem
>>>> that's mounted on the same path on all worker nodes.
>>>>
>>>> Is this really true?
>>>
>>> scala> val f = sc.textFile("/root/ue/ue_env.sh")
>>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870)
>>> called with curMem=34870, maxMem=4081511301
>>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
>>> values to memory (estimated size 34.1 KB, free 3.8 GB)
>>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>>>
>>> scala> f.map(l=>l.split(" ")).collect
>>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
>>> process : 1
>>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
>>> <console>:15
>>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
>>> <console>:15) with 2 output partitions (*allowLocal=false*)
>>>  ...
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as
>>> TID 4 on executor 0: ip-10-129-25-28 (preferred)
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
>>> 1517 bytes in 3 ms
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as
>>> TID 5 on executor 0: ip-10-129-25-28 (preferred)
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
>>> 1517 bytes in 0 ms
>>>
>>> Doesn't allowLocal=false mean the job is getting distributed to workers
>>> rather than computed locally?
>>>
>>>
>>> tks
>>>
>>>
>>>>
>>>> Matei
>>>>
>>>>
>>>> Any help will be appreciated.
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> --
>>>>
>>>> Shangyu, Luo
>>>> Department of Computer Science
>>>> Rice University
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to