How important is it that they're partitioned on hashcode() % 32 rather than Spark's default partitioning?
In scala, you should be able to do this with rdd.distinct.coalesce(32).mapPartitions(p => sorted(p)) I'm not sure what your end goal is here, but if it's just sort a bunch of data and remove duplicates, then that should be rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k) On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <[email protected]> wrote: > Thanks for your answer. But the problem is that I only want to sort the 32 > partitions, individually, > not the complete input. So yes, the output has to consist of 32 > partitions, each sorted. > > Ceriel Jacobs > > > > On 12/04/2013 06:30 PM, Ashish Rangole wrote: > >> I am not sure if 32 partitions is a hard limit that you have. >> >> Unless you have a strong reason to use only 32 partitions, please try >> providing the second optional >> argument (numPartitions) to reduceByKey and sortByKey methods which will >> paralellize these Reduce operations. >> A number 3x the number of total cores on the cluster would be a good >> value to try for numPartitions. >> >> http://spark.incubator.apache.org/docs/latest/tuning.html# >> memory-usage-of-reduce-tasks >> >> In case you have to have 32 partitions in the final output, you can use >> coalesce(32) method on your >> RDD at the time of final output. >> > > On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <[email protected]<mailto: >> [email protected]>> wrote: >> >> Hi, >> >> I am a novice to SPARK, and need some help with the following problem: >> I have a >> JavaRDD<String> strings; >> which is potentially large, hundreds of GBs, and I need to split them >> into 32 partitions, by means of hashcode()%32, and then sort these >> partitions, >> and also remove duplicates. I am having trouble finding an efficient >> way of >> expressing this in SPARK. I think I need an RDD to be able to sort, >> so in >> this case, I need 32 of them. So I first created an RDD with pairs >> <partitionNo, string>, >> like this: >> >> JavaPairRDD<Integer, String> hashStrings = strings >> .keyBy(new Function<String, Integer>() { >> @Override >> public Integer call(String s) { >> return new Integer(s.hashCode() % 32); >> } >> }); >> >> And then I launch 32 threads that do the following (each thread has >> its own partition): >> >> // Filter for my own partition >> JavaPairRDD<Integer, String> filtered = hashStrings >> .filter(new Function<Tuple2<Integer, String>, >> Boolean>() { >> @Override >> public Boolean call(Tuple2<Integer, String> >> tpl) { >> return tpl._1 == partition; >> } >> }); >> JavaRDD<String> values = filtered.values(); >> >> // Pair with a boolean, so that we can use sortByKey(). >> JavaPairRDD<String, Boolean> values1 = >> values.map(new PairFunction<String, String, >> Boolean>() { >> @Override >> public Tuple2<String, Boolean> call(String >> s) { >> return new Tuple2<String, Boolean>(s, >> true); >> } >> }); >> >> // Reduce by key to remove duplicates. >> JavaPairRDD<String, Boolean> reduced = >> values1.reduceByKey( >> new Function2<Boolean, Boolean, >> Boolean>() { >> @Override >> public Boolean call(Boolean i1, >> Boolean i2) { >> // return i1 + i2; >> return true; >> } >> }); >> >> // Sort and extract keys. >> JavaRDD<String> result = reduced.sortByKey().keys(); >> >> This works for not so large input, but for larger I get all kinds of >> out-of-memory >> exceptions. I'm running on 8 nodes, each with 8 cores, and am using >> SPARK_MEM=16G. >> I also tried StorageLevel.MEMORY_AND_DISK() for all the RDDs, but >> that just seems to >> make things much slower, and still gives out-of-memory exceptions. >> >> Now I'm pretty sure that the way I obtain the partitions is really >> inefficient, and I also >> have my doubts about starting the RDDs in separate threads. So, what >> would be the best way >> to deal with this? >> >> Thanks in advance for any hints that you can give me. >> >> Ceriel Jacobs >> >> >> >
