On Thu, Jan 16, 2014 at 11:39 AM, Archit Thakur
<[email protected]>wrote:

> The command
>
> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3
> val rdd2 = rdd1.cartesian(rdd1)
>
>
> will create a "CartesianRDD" which has no. of partitions = N^2 (N defined
> by your code).
>

N^2 partitions does not seem to be correct. With N = 1,000, spark creates
about 30,000 partitions, instead of 1,000,000. It seems other parameters,
such as the number of cores, also affect this.


>
> Q: Is the number of part-* files determined by rdd2.partitions.length?
> Yes.
>

Just to confirm, is the number of tasks also the same as the number of
partitions?

I can see that the number of part-* is exactly the same as the number of
tasks.


>
> Q: Is there a way to keep the size of each part-* file a constant (eg 64
> MB) regardless of other parameters, including number of available cores and
> scheduled tasks?
> Lets say your RDD has N MB of data.
> You can create your own CustmRDD and by overriding getPartitions and
> partitioner- you can create no .of partitions = N/64 and distribute the
> data equally (64MB). and perform rdd.operationToTransformItInto_CustmRDD.
>

I tried to control the partition size by passing a second argument to
sc.parallelize(), but, when I decrease the number of partitions in such a
way that each partition size goes slightly over 3 MB, I get akka timeout,
which is already set to 100 seconds.


>
> PS: There might be an operation/RDD that already does the same, I am not
> aware of it as of now. Please let me know, if you are able to figure it out.
>
> Thanks and Regards,
> Archit Thakur.
>
>
> On Tue, Jan 14, 2014 at 11:51 PM, Aureliano Buendia 
> <[email protected]>wrote:
>
>>
>>
>>
>> On Tue, Jan 14, 2014 at 5:00 PM, Archit Thakur <[email protected]
>> > wrote:
>>
>>> Hadoop block size decreased, do you mean HDFS block size? That is not
>>> possible.
>>>
>>
>> Sorry for terminology mix up. In my question 'hadoop block size' should
>> probably be replaced by 'RDD partitions number'.
>>
>> I'm getting a large number of small files (named part-*), and I'd like to
>> get a smaller number of larger files.
>>
>> I used something like:
>>
>> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3
>> val rdd2 = rdd1.cartesian(rdd1)
>>
>> Is the number of part-* files determined by rdd2.partitions.length?
>>
>> Is there a way to keep the size of each part-* file a constant (eg 64 MB)
>> regardless of other parameters, including number of available cores and
>> scheduled tasks?
>>
>>
>>> Block size of HDFS is never affected by your spark jobs.
>>>
>>> "For a big number of tasks, I get a very high number of 1 MB files
>>> generated by saveAsSequenceFile()."
>>>
>>> What do you mean by "big number of tasks"
>>>
>>> No. of files generated by saveAsSequenceFile() increases if your
>>> partitions of RDD are increased.
>>>
>>> Are you using your custom RDD? If Yes, you would have overridden the
>>> method getPartitions - Check that.
>>> If not, you might have used an operation where you specify your
>>> partitioner or no. of output partitions, eg. groupByKey() - Check that.
>>>
>>> "How is it possible to control the block size by spark?" Do you mean
>>> "How is it possible to control the output partitions of an RDD?"
>>>
>>>
>>> On Tue, Jan 14, 2014 at 7:59 AM, Aureliano Buendia <[email protected]
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Does the output hadoop block size depend on spark tasks number?
>>>>
>>>> In my application, when the number of tasks increases, hadoop block
>>>> size decreases. For a big number of tasks, I get a very high number of 1 MB
>>>> files generated by saveAsSequenceFile().
>>>>
>>>> How is it possible to control the block size by spark?
>>>>
>>>
>>>
>>
>

Reply via email to