On Tue, Jan 7, 2014 at 6:04 PM, Andrew Ash <[email protected]> wrote:

> I think that would do what you want.  I'm guessing in "..." you have an
> rdd and then call .collect on it -- normally this would be a bad idea
> because of large data sizes, but if you KNOW that it's small then you can
> force it through just that one machine.
>

This is what I'm doing:

val smallInput = sc.textFile("input")
val smallInputBroadcast = sc.broadcast(smallInput.collect())
sc.parallelize(Range.Int(0, smallInputBroadcast.value.length,
System.getProperty("spark.cores.max").toInt)

Each worker needs to have access to full smallInput in later stages, so I'm
sending the whole small dataset by broadcast to avoid future network
shuffling. Then, each worker is assigned a certain part of smallInput by
parallelize() to generate the bigger dataset.

Is this an idiomatic way in spark, or should it be done in another way?


>
>
> On Tue, Jan 7, 2014 at 9:20 AM, Aureliano Buendia <[email protected]>wrote:
>
>>
>>
>>
>> On Tue, Jan 7, 2014 at 5:13 PM, Andrew Ash <[email protected]> wrote:
>>
>>> If small-file is hosted in HDFS I think the default is one partition per
>>> HDFS block. If it's in one block, which are 64MB each by default, that
>>> might be one partition.
>>>
>> So if I want to parallelize processing that small file (which only fits
>> in one block) over 100 machines, instead of calling:
>>
>> sc.parallelize(..., smallInput.partitions.length)
>>
>> should I call?:
>>
>> sc.parallelize(..., System.getProperty("spark.cores.max").toInt)
>>
>>
>>> Sent from my mobile phone
>>> On Jan 7, 2014 8:46 AM, "Aureliano Buendia" <[email protected]>
>>> wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jan 2, 2014 at 5:52 PM, Andrew Ash <[email protected]>wrote:
>>>>
>>>>> That sounds right Mayur.
>>>>>
>>>>> Also in 0.8.1 I hear there's a new repartition method that you might
>>>>> be able to use to further distribute the data.  But if your data is so
>>>>> small that it fits in just a couple blocks, why are you using 20 machines
>>>>> just to process a quarter GB of data?
>>>>>
>>>>
>>>> Here is a use case: We could start from an extremely small file which
>>>> could be transformed into a huge in-memory dataset, then reduced to a very
>>>> small dataset.
>>>>
>>>> In a more concrete form, assume we have 100 worker machines and start
>>>> from a small input file:
>>>>
>>>> val smallInput = sc.textFile("small-input")
>>>>
>>>> In this case, would smallInput.partitions.length be a small number, or
>>>> would it be 100?
>>>>
>>>> If we do expect the next transformation to make the data significantly
>>>> bigger, how to force it to be processed over the 100 machines?
>>>>
>>>>
>>>>> Is the computation on each bit extremely intensive?
>>>>>
>>>>>
>>>>> On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> I have experienced a similar issue. The easiest fix I found was to
>>>>>> increase the replication of the data being used in the worker to the 
>>>>>> number
>>>>>> of workers you want to use for processing. The RDD seem to created on all
>>>>>> the machines where the blocks are replicated. Please correct me if I am
>>>>>> wrong.
>>>>>>
>>>>>> Regards
>>>>>> Mayur
>>>>>>
>>>>>> Mayur Rustagi
>>>>>> Ph: +919632149971
>>>>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>>>>> https://twitter.com/mayur_rustagi
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash <[email protected]>wrote:
>>>>>>
>>>>>>> Hi lihu,
>>>>>>>
>>>>>>> Maybe the data you're accessing is in in HDFS and only resides on 4
>>>>>>> of your 20 machines because it's only about 4 blocks (at default 64MB /
>>>>>>> block that's around a quarter GB).  Where is your source data located 
>>>>>>> and
>>>>>>> how is it stored?
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 2, 2014 at 7:53 AM, lihu <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>    I run  spark on a cluster with 20 machine, but when I start an
>>>>>>>> application use the spark-shell, there only 4 machine is working , the
>>>>>>>> other with just idle, without memery and cpu used, I watch this through
>>>>>>>> webui.
>>>>>>>>
>>>>>>>>    I wonder the other machine maybe  busy, so i watch the machines
>>>>>>>> using  "top" and "free" command, but this is not。
>>>>>>>>
>>>>>>>>   * So I just wonder why not spark assignment work to all all the
>>>>>>>> 20 machine? this is not a good resource usage.*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to