On Thu, Jan 16, 2014 at 11:40 PM, Aureliano Buendia <[email protected]>wrote:
> > > > On Thu, Jan 16, 2014 at 11:39 AM, Archit Thakur <[email protected] > > wrote: > >> The command >> >> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3 >> val rdd2 = rdd1.cartesian(rdd1) >> >> >> will create a "CartesianRDD" which has no. of partitions = N^2 (N defined >> by your code). >> > > N^2 partitions does not seem to be correct. With N = 1,000, spark creates > about 30,000 partitions, instead of 1,000,000. It seems other parameters, > such as the number of cores, also affect this. > Now this seems interesting, because I can find it in the code that override def getPartitions: Array[Partition] = { // create the cross product split val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array } that array size is the product of rdd's size, rdd1.partitions.size * rdd2.partitions.size. Now, What other factors control that may be number of cores, but Still why 30,000 in your case? Does your core_on_each_machine* Nmachine yield to 30,000? > >> >> Q: Is the number of part-* files determined by rdd2.partitions.length? >> Yes. >> > > Just to confirm, is the number of tasks also the same as the number of > partitions? > > I can see that the number of part-* is exactly the same as the number of > tasks. > > Now since, your operation is shuffle one, it'll have two stages and no_of_task should be no_of_partition for both stages. Can you confirm does it launch no_of_partition tasks twice? >> Q: Is there a way to keep the size of each part-* file a constant (eg 64 >> MB) regardless of other parameters, including number of available cores and >> scheduled tasks? >> Lets say your RDD has N MB of data. >> You can create your own CustmRDD and by overriding getPartitions and >> partitioner- you can create no .of partitions = N/64 and distribute the >> data equally (64MB). and perform rdd.operationToTransformItInto_CustmRDD. >> > > I tried to control the partition size by passing a second argument to > sc.parallelize(), but, when I decrease the number of partitions in such a > way that each partition size goes slightly over 3 MB, I get akka timeout, > which is already set to 100 seconds. > Try increasing timeout, maybe. > > >> >> PS: There might be an operation/RDD that already does the same, I am not >> aware of it as of now. Please let me know, if you are able to figure it out. >> >> Thanks and Regards, >> Archit Thakur. >> >> >> On Tue, Jan 14, 2014 at 11:51 PM, Aureliano Buendia <[email protected] >> > wrote: >> >>> >>> >>> >>> On Tue, Jan 14, 2014 at 5:00 PM, Archit Thakur < >>> [email protected]> wrote: >>> >>>> Hadoop block size decreased, do you mean HDFS block size? That is not >>>> possible. >>>> >>> >>> Sorry for terminology mix up. In my question 'hadoop block size' should >>> probably be replaced by 'RDD partitions number'. >>> >>> I'm getting a large number of small files (named part-*), and I'd like >>> to get a smaller number of larger files. >>> >>> I used something like: >>> >>> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3 >>> val rdd2 = rdd1.cartesian(rdd1) >>> >>> Is the number of part-* files determined by rdd2.partitions.length? >>> >>> Is there a way to keep the size of each part-* file a constant (eg 64 >>> MB) regardless of other parameters, including number of available cores and >>> scheduled tasks? >>> >>> >>>> Block size of HDFS is never affected by your spark jobs. >>>> >>>> "For a big number of tasks, I get a very high number of 1 MB files >>>> generated by saveAsSequenceFile()." >>>> >>>> What do you mean by "big number of tasks" >>>> >>>> No. of files generated by saveAsSequenceFile() increases if your >>>> partitions of RDD are increased. >>>> >>>> Are you using your custom RDD? If Yes, you would have overridden the >>>> method getPartitions - Check that. >>>> If not, you might have used an operation where you specify your >>>> partitioner or no. of output partitions, eg. groupByKey() - Check that. >>>> >>>> "How is it possible to control the block size by spark?" Do you mean >>>> "How is it possible to control the output partitions of an RDD?" >>>> >>>> >>>> On Tue, Jan 14, 2014 at 7:59 AM, Aureliano Buendia < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> Does the output hadoop block size depend on spark tasks number? >>>>> >>>>> In my application, when the number of tasks increases, hadoop block >>>>> size decreases. For a big number of tasks, I get a very high number of 1 >>>>> MB >>>>> files generated by saveAsSequenceFile(). >>>>> >>>>> How is it possible to control the block size by spark? >>>>> >>>> >>>> >>> >> >
