Matei, do you mean something like A rather than B below? A) rdd.map(_.clone).cache B) rdd.cache
I'd be happy to add documentation if there's a good place for it, but I'm not sure there's an obvious place for it. On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <[email protected]>wrote: > Yeah, unfortunately sequenceFile() reuses the Writable object across > records. If you plan to use each record repeatedly (e.g. cache it), you > should clone them using a map function. It was originally designed assuming > you only look at each record once, but it’s poorly documented. > > Matei > > On Jan 7, 2014, at 11:32 PM, Michael Quinlan <[email protected]> wrote: > > > I've spent some time trying to import data into an RDD using the Spark > Java > > API, but am not able to properly load data stored in a Hadoop v1.1.1 > > sequence file with key and value types both LongWritable. I've attached a > > copy of the sequence file to this posting. It contains 3000 key, value > > pairs. I'm attempting to read using the following code snip: > > > > System.setProperty("spark.serializer", > > "org.apache.spark.serializer.KryoSerializer"); > > > System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator"); > > > > JavaSparkContext ctx = new JavaSparkContext("local[2]", > > "AppName", > > "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); > > > > //Load DataCube via Spark sequenceFile > > JavaPairRDD<LongWritable,LongWritable> DataCube = > > ctx.sequenceFile("/local_filesystem/output.seq", > > LongWritable.class, LongWritable.class); > > > > The code above produces a DataCube filled with duplicate entries > relating in > > some way to the number of splits. For example, the last 1500 or so > entries > > all have the same key and value: (2999,22483). The previous 1500 entries > > appear to represent the last key value from first split of the file. I've > > confirmed that changing the number of threads (local[3]) does change the > RDD > > representation, maintaining this general last key value pattern. > > > > Using the Hadoop (only) API methods, I am able to correctly read the file > > even from within the same Jar: > > > > Configuration conf = new Configuration(); > > FileSystem fs = FileSystem.get(conf); > > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new > > Path("/local_filesystem/output.seq"), conf); > > LongWritable key = new LongWritable(); > > LongWritable value = new LongWritable(); > > while(reader.next(key, value)) { > > System.out.println(key + ":" + value); > > } > > > > I've also confirmed that an RDD populated by the ctx.parallelize() > method: > > > > int n=100; > > List<LongWritable> tl = new ArrayList<LongWritable>(n); > > for (int i = 0; i < n; i++) tl.add(new LongWritable(i)); > > JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1); > > DataCube = preCube.map( > > new PairFunction<LongWritable,LongWritable,LongWritable> > () > > { > > @Override > > public Tuple2<LongWritable,LongWritable> > > call(LongWritable in) throws Exception { > > return (new Tuple2(in, in)); > > } > > }); > > > > can be written to a sequence file using the RDD method: > > > > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class, > > LongWritable.class, SequenceFileOutputFormat.class); > > > > and correctly read using the Hadoop (only) API copied above. > > > > It seems like there only a problem when I'm attempting to read the > sequence > > file directly into the RDD. All other operations are performing as > expected. > > > > I'd greatly appreciate any advice someone could provide. > > > > Regards, > > > > Michael > > > > output.seq > > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq> > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > >
