On Thu, Jan 16, 2014 at 11:39 AM, Archit Thakur <[email protected]>wrote:
> The command > > val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3 > val rdd2 = rdd1.cartesian(rdd1) > > > will create a "CartesianRDD" which has no. of partitions = N^2 (N defined > by your code). > N^2 partitions does not seem to be correct. With N = 1,000, spark creates about 30,000 partitions, instead of 1,000,000. It seems other parameters, such as the number of cores, also affect this. > > Q: Is the number of part-* files determined by rdd2.partitions.length? > Yes. > Just to confirm, is the number of tasks also the same as the number of partitions? I can see that the number of part-* is exactly the same as the number of tasks. > > Q: Is there a way to keep the size of each part-* file a constant (eg 64 > MB) regardless of other parameters, including number of available cores and > scheduled tasks? > Lets say your RDD has N MB of data. > You can create your own CustmRDD and by overriding getPartitions and > partitioner- you can create no .of partitions = N/64 and distribute the > data equally (64MB). and perform rdd.operationToTransformItInto_CustmRDD. > I tried to control the partition size by passing a second argument to sc.parallelize(), but, when I decrease the number of partitions in such a way that each partition size goes slightly over 3 MB, I get akka timeout, which is already set to 100 seconds. > > PS: There might be an operation/RDD that already does the same, I am not > aware of it as of now. Please let me know, if you are able to figure it out. > > Thanks and Regards, > Archit Thakur. > > > On Tue, Jan 14, 2014 at 11:51 PM, Aureliano Buendia > <[email protected]>wrote: > >> >> >> >> On Tue, Jan 14, 2014 at 5:00 PM, Archit Thakur <[email protected] >> > wrote: >> >>> Hadoop block size decreased, do you mean HDFS block size? That is not >>> possible. >>> >> >> Sorry for terminology mix up. In my question 'hadoop block size' should >> probably be replaced by 'RDD partitions number'. >> >> I'm getting a large number of small files (named part-*), and I'd like to >> get a smaller number of larger files. >> >> I used something like: >> >> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3 >> val rdd2 = rdd1.cartesian(rdd1) >> >> Is the number of part-* files determined by rdd2.partitions.length? >> >> Is there a way to keep the size of each part-* file a constant (eg 64 MB) >> regardless of other parameters, including number of available cores and >> scheduled tasks? >> >> >>> Block size of HDFS is never affected by your spark jobs. >>> >>> "For a big number of tasks, I get a very high number of 1 MB files >>> generated by saveAsSequenceFile()." >>> >>> What do you mean by "big number of tasks" >>> >>> No. of files generated by saveAsSequenceFile() increases if your >>> partitions of RDD are increased. >>> >>> Are you using your custom RDD? If Yes, you would have overridden the >>> method getPartitions - Check that. >>> If not, you might have used an operation where you specify your >>> partitioner or no. of output partitions, eg. groupByKey() - Check that. >>> >>> "How is it possible to control the block size by spark?" Do you mean >>> "How is it possible to control the output partitions of an RDD?" >>> >>> >>> On Tue, Jan 14, 2014 at 7:59 AM, Aureliano Buendia <[email protected] >>> > wrote: >>> >>>> Hi, >>>> >>>> Does the output hadoop block size depend on spark tasks number? >>>> >>>> In my application, when the number of tasks increases, hadoop block >>>> size decreases. For a big number of tasks, I get a very high number of 1 MB >>>> files generated by saveAsSequenceFile(). >>>> >>>> How is it possible to control the block size by spark? >>>> >>> >>> >> >
