You can easily accomplish the hashmod 32 using Spark's partitionBy: val strings: RDD[String] = ... strings.map(s => (s, 1)).partitionBy(new HashPartitioner(32)).keys
On Wed, Dec 4, 2013 at 11:05 AM, Andrew Ash <[email protected]> wrote: > How important is it that they're partitioned on hashcode() % 32 rather > than Spark's default partitioning? > > In scala, you should be able to do this with > rdd.distinct.coalesce(32).mapPartitions(p => sorted(p)) > > I'm not sure what your end goal is here, but if it's just sort a bunch of > data and remove duplicates, then that should be > rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k) > > > On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <[email protected]> wrote: > >> Thanks for your answer. But the problem is that I only want to sort the >> 32 partitions, individually, >> not the complete input. So yes, the output has to consist of 32 >> partitions, each sorted. >> >> Ceriel Jacobs >> >> >> >> On 12/04/2013 06:30 PM, Ashish Rangole wrote: >> >>> I am not sure if 32 partitions is a hard limit that you have. >>> >>> Unless you have a strong reason to use only 32 partitions, please try >>> providing the second optional >>> argument (numPartitions) to reduceByKey and sortByKey methods which will >>> paralellize these Reduce operations. >>> A number 3x the number of total cores on the cluster would be a good >>> value to try for numPartitions. >>> >>> http://spark.incubator.apache.org/docs/latest/tuning.html# >>> memory-usage-of-reduce-tasks >>> >>> In case you have to have 32 partitions in the final output, you can use >>> coalesce(32) method on your >>> RDD at the time of final output. >>> >> >> On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <[email protected]<mailto: >>> [email protected]>> wrote: >>> >>> Hi, >>> >>> I am a novice to SPARK, and need some help with the following >>> problem: >>> I have a >>> JavaRDD<String> strings; >>> which is potentially large, hundreds of GBs, and I need to split them >>> into 32 partitions, by means of hashcode()%32, and then sort these >>> partitions, >>> and also remove duplicates. I am having trouble finding an efficient >>> way of >>> expressing this in SPARK. I think I need an RDD to be able to sort, >>> so in >>> this case, I need 32 of them. So I first created an RDD with pairs >>> <partitionNo, string>, >>> like this: >>> >>> JavaPairRDD<Integer, String> hashStrings = strings >>> .keyBy(new Function<String, Integer>() { >>> @Override >>> public Integer call(String s) { >>> return new Integer(s.hashCode() % 32); >>> } >>> }); >>> >>> And then I launch 32 threads that do the following (each thread has >>> its own partition): >>> >>> // Filter for my own partition >>> JavaPairRDD<Integer, String> filtered = hashStrings >>> .filter(new Function<Tuple2<Integer, String>, >>> Boolean>() { >>> @Override >>> public Boolean call(Tuple2<Integer, String> >>> tpl) { >>> return tpl._1 == partition; >>> } >>> }); >>> JavaRDD<String> values = filtered.values(); >>> >>> // Pair with a boolean, so that we can use sortByKey(). >>> JavaPairRDD<String, Boolean> values1 = >>> values.map(new PairFunction<String, String, >>> Boolean>() { >>> @Override >>> public Tuple2<String, Boolean> call(String >>> s) { >>> return new Tuple2<String, Boolean>(s, >>> true); >>> } >>> }); >>> >>> // Reduce by key to remove duplicates. >>> JavaPairRDD<String, Boolean> reduced = >>> values1.reduceByKey( >>> new Function2<Boolean, Boolean, >>> Boolean>() { >>> @Override >>> public Boolean call(Boolean i1, >>> Boolean i2) { >>> // return i1 + i2; >>> return true; >>> } >>> }); >>> >>> // Sort and extract keys. >>> JavaRDD<String> result = reduced.sortByKey().keys(); >>> >>> This works for not so large input, but for larger I get all kinds of >>> out-of-memory >>> exceptions. I'm running on 8 nodes, each with 8 cores, and am using >>> SPARK_MEM=16G. >>> I also tried StorageLevel.MEMORY_AND_DISK() for all the RDDs, but >>> that just seems to >>> make things much slower, and still gives out-of-memory exceptions. >>> >>> Now I'm pretty sure that the way I obtain the partitions is really >>> inefficient, and I also >>> have my doubts about starting the RDDs in separate threads. So, what >>> would be the best way >>> to deal with this? >>> >>> Thanks in advance for any hints that you can give me. >>> >>> Ceriel Jacobs >>> >>> >>> >> >
