Thanks for your answer.
The partitioning function is not that important. What is important that I only 
sort the partitions,
not the complete RDD. Your suggestion to use 
rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))
sounds nice, and I had indeed seen the coalesce method and the mapPartitions 
method, but
could (and can) not figure out how to do the (p => sorted(p)) part in 
Spark/Java.
The only way to sort that I could find in the Spark documentation is sortByKey, 
but for that you need
an RDD per partition, I think.

Ceriel Jacobs


On 12/04/2013 08:05 PM, Andrew Ash wrote:
How important is it that they're partitioned on hashcode() % 32 rather than 
Spark's default partitioning?

In scala, you should be able to do this with 
rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))

I'm not sure what your end goal is here, but if it's just sort a bunch of data 
and remove duplicates, then that should be
rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)


On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <[email protected] 
<mailto:[email protected]>> wrote:

    Thanks for your answer. But the problem is that I only want to sort the 32 
partitions, individually,
    not the complete input. So yes, the output has to consist of 32 partitions, 
each sorted.

    Ceriel Jacobs



    On 12/04/2013 06:30 PM, Ashish Rangole wrote:

        I am not sure if 32 partitions is a hard limit that you have.

        Unless you have a strong reason to use only 32 partitions, please try 
providing the second optional
        argument (numPartitions) to reduceByKey and sortByKey methods which 
will paralellize these Reduce operations.
        A number 3x the number of total cores on the cluster would be a good 
value to try for numPartitions.

        
http://spark.incubator.apache.__org/docs/latest/tuning.html#__memory-usage-of-reduce-tasks
        
<http://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks>

        In case you have to have 32 partitions in the final output, you can use 
coalesce(32) method on your
        RDD at the time of final output.


        On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <[email protected] 
<mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>> wrote:

             Hi,

             I am a novice to SPARK, and need some help with the following 
problem:
             I have a
                      JavaRDD<String> strings;
             which is potentially large, hundreds of GBs, and I need to split 
them
             into 32 partitions, by means of hashcode()%32, and then sort these 
partitions,
             and also remove duplicates. I am having trouble finding an 
efficient way of
             expressing this in SPARK. I think I need an RDD to be able to 
sort, so in
             this case, I need 32 of them. So I first created an RDD with pairs 
<partitionNo, string>,
             like this:

                      JavaPairRDD<Integer, String> hashStrings = strings
                              .keyBy(new Function<String, Integer>() {
                                  @Override
                                  public Integer call(String s) {
                                      return new Integer(s.hashCode() % 32);
                                  }
                              });

             And then I launch 32 threads that do the following (each thread 
has its own partition):

                          // Filter for my own partition
                          JavaPairRDD<Integer, String> filtered = hashStrings
                                  .filter(new Function<Tuple2<Integer, String>, 
Boolean>() {
                                      @Override
                                      public Boolean call(Tuple2<Integer, 
String> tpl) {
                                          return tpl._1 == partition;
                                      }
                                  });
                          JavaRDD<String> values = filtered.values();

                          // Pair with a boolean, so that we can use 
sortByKey().
                          JavaPairRDD<String, Boolean> values1 =
                                  values.map(new PairFunction<String, String, 
Boolean>() {
                                      @Override
                                      public Tuple2<String, Boolean> 
call(String s) {
                                          return new Tuple2<String, Boolean>(s, 
true);
                                      }
                                  });

                          // Reduce by key to remove duplicates.
                          JavaPairRDD<String, Boolean> reduced =
                                  values1.reduceByKey(
                                          new Function2<Boolean, Boolean, 
Boolean>() {
                                              @Override
                                              public Boolean call(Boolean i1,
                                                      Boolean i2) {
                                                  // return i1 + i2;
                                                  return true;
                                              }
                                          });

                          // Sort and extract keys.
                          JavaRDD<String> result = reduced.sortByKey().keys();

             This works for not so large input, but for larger I get all kinds 
of out-of-memory
             exceptions. I'm running on 8 nodes, each with 8 cores, and am 
using SPARK_MEM=16G.
             I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but 
that just seems to
             make things much slower, and still gives out-of-memory exceptions.

             Now I'm pretty sure that the way I obtain the partitions is really 
inefficient, and I also
             have my doubts about starting the RDDs in separate threads. So, 
what would be the best way
             to deal with this?

             Thanks in advance for any hints that you can give me.

             Ceriel Jacobs





Reply via email to