Thank you for your reply, and your tips on code refactoring is helpful, after a 
second look on the code, the casts and null check is really unnecessary.


qinwei
 From: Sean OwenDate: 2014-09-28 15:03To: qinweiCC: userSubject: Re: problem 
with patitioning(Most of this code is not relevant to the question and can be 
refactored too. The casts and null checks look unnecessary.)
You are unioning RDDs so you have a result with the sum of their partitions. 
The number of partitions is really a hint to Hadoop only so it is not even 
necessarily 3 x 1920.
Try not specifying the partitions at the source, and instead trying repartition 
after union to reduce the number of partitions. 
On Sep 28, 2014 7:36 AM, "qinwei" <wei....@dewmobile.net> wrote:

Hi, everyone    I come across a problem with changing the patition number of 
the rdd,  my code is as below:    val rdd1 = sc.textFile(path1)     val rdd2 = 
sc.textFile(path2)

    val rdd3 = sc.textFile(path3)



    val imeiList = parseParam(job.jobParams)

    val broadcastVar = sc.broadcast(imeiList)

    val structuredRDD1 = rdd1.map(line => {           val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 35){

                                                                                
            (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, 
trunks(12).trim, trunks(13).trim.toLong)

                                                                                
            }

                                                                                
})

    val structuredRDD2 = rdd2.map(line => {           val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 33){

                                                                                
            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, 
trunks(12).trim, trunks(3).trim.toLong)

                                                                                
                }

                                                                                
})

    val structuredRDD3 = rdd3.map(line => {          val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 33){

                                                                                
            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, 
trunks(12).trim, trunks(3).trim.toLong)

                                                                                
               }

                                                                        })

    val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3)

    val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
.map(arg => arg.asInstanceOf[(String, String, String, String, String, Long)])
.filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, 
broadcastVar.value, 0))
    val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", 
\"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" + 
arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " + arg._6.toString() 
+ "}")

    val jsonArray = jsonStrRDD.collect
    I noticed that there are 3834 tasks by default,  and 3834 is the number of 
files in path1 and path2 and path3,  i want to change the number of patition by 
the code below:    val rdd1 = sc.textFile(path1, 1920) 
    val rdd2 = sc.textFile(path2, 1920) 
    val rdd3 = sc.textFile(path3, 1920)
    by doing this, i expect there are 1920 tasks totally, but i found the 
number of tasks becomes 8920, any idea what's going on here?
    Thanks!



qinwei


Reply via email to