Hence the qualification to determine whether it is necessary *and* sufficient, depending on what David is trying to do overall :)
Sent while mobile. Pls excuse typos etc. On Jan 28, 2014 2:10 PM, "Mark Hamstra" <[email protected]> wrote: > SparkContext#runJob is the basis of an RDD action, so the result of using > runJob to call toUpperCase on the A-to-M partitions will be the uppercased > strings materialized in the driver process, not a transformation of the > original RDD. > > > On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[email protected]>wrote: > >> David, >> >> map() would iterate row by row, forcing an if on each row. >> >> mapPartitions*() allows you to have a conditional on the whole partition >> first, as Mark suggests. That should usually be sufficient. >> >> SparkContext.runJob() allows you to specify which partitions to run on, >> if you're sure it's necessary and sufficient, and not over optimization. >> >> Sent while mobile. Pls excuse typos etc. >> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[email protected]> wrote: >> >>> If I'm understanding you correctly, there's lots of ways you could do >>> that. Here's one, continuing from the previous example: >>> >>> // rdd26: RDD[String] split by first letter into 26 partitions >>> >>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65) >>> >>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) >>> itr.map(_.toUpperCase) else itr } >>> >>> >>> >>> >>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[email protected]>wrote: >>> >>>> Thank you! That helps. >>>> >>>> A follow up question on this. How can I apply a function only on a >>>> subset of this RDD. Lets say, I need all strings starting in the range 'A' >>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible >>>> without running an 'if' condition on all the partitions in the cluster? >>>> >>>> >>>> >>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra >>>> <[email protected]>wrote: >>>> >>>>> scala> import org.apache.spark.RangePartitioner >>>>> >>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", >>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", >>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", >>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra")) >>>>> >>>>> scala> rdd.keyBy(s => s(0).toUpper) >>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy >>>>> at <console>:15 >>>>> >>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26, >>>>> res0)).values >>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at >>>>> <console>:18 >>>>> >>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, >>>>> s))).collect.foreach(println) >>>>> >>>>> (0,apple) >>>>> (1,Ball) >>>>> (2,cat) >>>>> (3,dog) >>>>> (4,Elephant) >>>>> (5,fox) >>>>> (6,gas) >>>>> (7,horse) >>>>> (8,index) >>>>> (9,jet) >>>>> (10,kitsch) >>>>> (11,long) >>>>> (12,moon) >>>>> (13,Neptune) >>>>> (14,ooze) >>>>> (15,Pen) >>>>> (16,quiet) >>>>> (17,rose) >>>>> (18,sun) >>>>> (19,talk) >>>>> (20,umbrella) >>>>> (21,voice) >>>>> (22,Walrus) >>>>> (23,xeon) >>>>> (24,Yam) >>>>> (25,zebra) >>>>> >>>>> >>>>> >>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath < >>>>> [email protected]> wrote: >>>>> >>>>>> If you do something like: >>>>>> >>>>>> rdd.map{ str => (str.take(1), str) } >>>>>> >>>>>> you will have an RDD[(String, String)] where the key is the first >>>>>> character of the string. Now when you perform an operation that uses >>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task >>>>>> receiving all the strings with A, the 2nd all the strings with B etc. >>>>>> Note >>>>>> that you may not be able to enforce that each *machine* gets a >>>>>> different letter, but in most cases that doesn't particularly matter as >>>>>> long as you get "all values for a given key go to the same reducer" >>>>>> behaviour. >>>>>> >>>>>> Perhaps if you expand on your use case we can provide more detailed >>>>>> assistance. >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[email protected]>wrote: >>>>>> >>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the >>>>>>> cluster. How can I repartition the RDD in such a way that all strings >>>>>>> starting with A gets collected on machine1, B on machine2 and so on. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >
