SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.
On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[email protected]> wrote: > David, > > map() would iterate row by row, forcing an if on each row. > > mapPartitions*() allows you to have a conditional on the whole partition > first, as Mark suggests. That should usually be sufficient. > > SparkContext.runJob() allows you to specify which partitions to run on, if > you're sure it's necessary and sufficient, and not over optimization. > > Sent while mobile. Pls excuse typos etc. > On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[email protected]> wrote: > >> If I'm understanding you correctly, there's lots of ways you could do >> that. Here's one, continuing from the previous example: >> >> // rdd26: RDD[String] split by first letter into 26 partitions >> >> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65) >> >> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) >> itr.map(_.toUpperCase) else itr } >> >> >> >> >> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[email protected]>wrote: >> >>> Thank you! That helps. >>> >>> A follow up question on this. How can I apply a function only on a >>> subset of this RDD. Lets say, I need all strings starting in the range 'A' >>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible >>> without running an 'if' condition on all the partitions in the cluster? >>> >>> >>> >>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra >>> <[email protected]>wrote: >>> >>>> scala> import org.apache.spark.RangePartitioner >>>> >>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", >>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", >>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", >>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra")) >>>> >>>> scala> rdd.keyBy(s => s(0).toUpper) >>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy >>>> at <console>:15 >>>> >>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26, >>>> res0)).values >>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at >>>> <console>:18 >>>> >>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, >>>> s))).collect.foreach(println) >>>> >>>> (0,apple) >>>> (1,Ball) >>>> (2,cat) >>>> (3,dog) >>>> (4,Elephant) >>>> (5,fox) >>>> (6,gas) >>>> (7,horse) >>>> (8,index) >>>> (9,jet) >>>> (10,kitsch) >>>> (11,long) >>>> (12,moon) >>>> (13,Neptune) >>>> (14,ooze) >>>> (15,Pen) >>>> (16,quiet) >>>> (17,rose) >>>> (18,sun) >>>> (19,talk) >>>> (20,umbrella) >>>> (21,voice) >>>> (22,Walrus) >>>> (23,xeon) >>>> (24,Yam) >>>> (25,zebra) >>>> >>>> >>>> >>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath < >>>> [email protected]> wrote: >>>> >>>>> If you do something like: >>>>> >>>>> rdd.map{ str => (str.take(1), str) } >>>>> >>>>> you will have an RDD[(String, String)] where the key is the first >>>>> character of the string. Now when you perform an operation that uses >>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task >>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note >>>>> that you may not be able to enforce that each *machine* gets a >>>>> different letter, but in most cases that doesn't particularly matter as >>>>> long as you get "all values for a given key go to the same reducer" >>>>> behaviour. >>>>> >>>>> Perhaps if you expand on your use case we can provide more detailed >>>>> assistance. >>>>> >>>>> >>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[email protected]>wrote: >>>>> >>>>>> Lets say I have an RDD of Strings and there are 26 machines in the >>>>>> cluster. How can I repartition the RDD in such a way that all strings >>>>>> starting with A gets collected on machine1, B on machine2 and so on. >>>>>> >>>>>> >>>>> >>>> >>> >>
