Yeah, unfortunately sequenceFile() reuses the Writable object across records. 
If you plan to use each record repeatedly (e.g. cache it), you should clone 
them using a map function. It was originally designed assuming you only look at 
each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq0...@gmail.com> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
> 
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
> 
> JavaSparkContext ctx = new JavaSparkContext("local[2]", 
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); 
> 
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq", 
>            LongWritable.class, LongWritable.class);
> 
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern. 
> 
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
> 
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);        
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
> 
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
> 
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable> 
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
> 
> can be written to a sequence file using the RDD method:
> 
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
> 
> and correctly read using the Hadoop (only) API copied above.
> 
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected. 
> 
> I'd greatly appreciate any advice someone could provide.
> 
> Regards,
> 
> Michael
> 
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>   
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to