Matei and Andrew,

Thank you both for your prompt responses. Matei is correct in that I am
attempting to cache a large RDD for repeated query.

I was able to implement your suggestion in a Scala version of the code,
which I've copied below. I should point out two minor details:
LongWritable.clone() is a private method and both the key and value need to
be "cloned" in order for the data to be cached correctly.

My attempt at a Java version wasn't as successful. If you don't mind, could
you please suggest a better way if it currently exists? This is mostly
educational since I already have a working version in Scala. I'm new to
both.

Regards,

Mike

Java:

public class App 
{
    public static void main(String[] args) throws Exception {
        if (args.length < 3) {
          System.err.println("Usage: SynthesisService <master> <input file>
<jar file>");
          System.exit(1);
        }
        
        System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
       
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

        JavaSparkContext ctx = new JavaSparkContext(args[0], 
            "SynthesisService",
            "~/spark-0.8.0-incubating",args[2]); 
        
        //Load DataCube via Spark sequenceFile
        JavaPairRDD<LongWritable,LongWritable> temp_DataCube =
ctx.sequenceFile(args[1], 
            LongWritable.class, LongWritable.class);
        
        JavaRDD<Tuple2&lt;LongWritable,LongWritable>> DataCube;
        DataCube = temp_DataCube.map(
                new
Function2<LongWritable,LongWritable,Tuple2&lt;LongWritable,LongWritable>> ()
{
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable key, LongWritable value) {
                        return (new Tuple2(new LongWritable(key.get()),
value));
                    }
                
                });

-----
COMPILATION ERROR : 
-------------------------------------------------------------
spark/synthesis/service/Init/App.java:[51,32] error: no suitable method
found for map(<anonymous
Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>>>)
1 error

Scala:

package testspark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoRegistrator

import org.apache.hadoop.io.LongWritable

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends KryoRegistrator{
    def registerClasses(kryo: Kryo){
        kryo.register(classOf[LongWritable]);
        kryo.register(classOf[Tuple2[LongWritable,LongWritable]]);
    }
}

object ScalaSynthesisServer {
        
        def pseudoClone(x: LongWritable, y: LongWritable):
(LongWritable,LongWritable) = {
                return new Tuple2(new LongWritable(x.get()) , new 
LongWritable(y.get()))
        }
        
        def main(args: Array[String]) {
                if (args.length < 3) {
                        System.err.println("Usage: ScalaSynthesisServer 
<master> <input file>
<jar file>")
                        System.exit(1)
                }
                
                System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                
System.setProperty("spark.kryo.registrator","testspark.MyRegistrator")
        
                val sc = new SparkContext(args(0),
"ScalaSynthesisServer","~/spark-0.8.0-incubating",List(args(2)))
                
                val DataCube = sc.sequenceFile(args(1), classOf[LongWritable],
classOf[LongWritable]).map(a => pseudoClone(a._1,a._2))
                
                DataCube.cache()
                
                val list = DataCube.collect();
                
                var x = 0; 
                for( x <- list ){
                        println("Key= " + x._1 + " Value= " + x._2);
                }
        }
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353p552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to