Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.
Matei On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq0...@gmail.com> wrote: > I've spent some time trying to import data into an RDD using the Spark Java > API, but am not able to properly load data stored in a Hadoop v1.1.1 > sequence file with key and value types both LongWritable. I've attached a > copy of the sequence file to this posting. It contains 3000 key, value > pairs. I'm attempting to read using the following code snip: > > System.setProperty("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator"); > > JavaSparkContext ctx = new JavaSparkContext("local[2]", > "AppName", > "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); > > //Load DataCube via Spark sequenceFile > JavaPairRDD<LongWritable,LongWritable> DataCube = > ctx.sequenceFile("/local_filesystem/output.seq", > LongWritable.class, LongWritable.class); > > The code above produces a DataCube filled with duplicate entries relating in > some way to the number of splits. For example, the last 1500 or so entries > all have the same key and value: (2999,22483). The previous 1500 entries > appear to represent the last key value from first split of the file. I've > confirmed that changing the number of threads (local[3]) does change the RDD > representation, maintaining this general last key value pattern. > > Using the Hadoop (only) API methods, I am able to correctly read the file > even from within the same Jar: > > Configuration conf = new Configuration(); > FileSystem fs = FileSystem.get(conf); > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new > Path("/local_filesystem/output.seq"), conf); > LongWritable key = new LongWritable(); > LongWritable value = new LongWritable(); > while(reader.next(key, value)) { > System.out.println(key + ":" + value); > } > > I've also confirmed that an RDD populated by the ctx.parallelize() method: > > int n=100; > List<LongWritable> tl = new ArrayList<LongWritable>(n); > for (int i = 0; i < n; i++) tl.add(new LongWritable(i)); > JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1); > DataCube = preCube.map( > new PairFunction<LongWritable,LongWritable,LongWritable> () > { > @Override > public Tuple2<LongWritable,LongWritable> > call(LongWritable in) throws Exception { > return (new Tuple2(in, in)); > } > }); > > can be written to a sequence file using the RDD method: > > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class, > LongWritable.class, SequenceFileOutputFormat.class); > > and correctly read using the Hadoop (only) API copied above. > > It seems like there only a problem when I'm attempting to read the sequence > file directly into the RDD. All other operations are performing as expected. > > I'd greatly appreciate any advice someone could provide. > > Regards, > > Michael > > output.seq > <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq> > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.