I've spent some time trying to import data into an RDD using the Spark Java
API, but am not able to properly load data stored in a Hadoop v1.1.1
sequence file with key and value types both LongWritable. I've attached a
copy of the sequence file to this posting. It contains 3000 key, value
pairs. I'm attempting to read using the following code snip:

System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

JavaSparkContext ctx = new JavaSparkContext("local[2]", 
            "AppName",
            "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); 
        
//Load DataCube via Spark sequenceFile
JavaPairRDD<LongWritable,LongWritable> DataCube =
ctx.sequenceFile("/local_filesystem/output.seq", 
            LongWritable.class, LongWritable.class);

The code above produces a DataCube filled with duplicate entries relating in
some way to the number of splits. For example, the last 1500 or so entries
all have the same key and value: (2999,22483). The previous 1500 entries
appear to represent the last key value from first split of the file. I've
confirmed that changing the number of threads (local[3]) does change the RDD
representation, maintaining this general last key value pattern. 

Using the Hadoop (only) API methods, I am able to correctly read the file
even from within the same Jar:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);        
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
Path("/local_filesystem/output.seq"), conf);
LongWritable key = new LongWritable();
LongWritable value = new LongWritable();
while(reader.next(key, value)) {
     System.out.println(key + ":" + value);
}

I've also confirmed that an RDD populated by the ctx.parallelize() method:

int n=100;
List<LongWritable> tl = new ArrayList<LongWritable>(n);
for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
DataCube = preCube.map(
                new PairFunction<LongWritable,LongWritable,LongWritable> ()
{
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable in) throws Exception {
                        return (new Tuple2(in, in));
                    }
                });

can be written to a sequence file using the RDD method:

DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
LongWritable.class, SequenceFileOutputFormat.class);

and correctly read using the Hadoop (only) API copied above.

It seems like there only a problem when I'm attempting to read the sequence
file directly into the RDD. All other operations are performing as expected. 

I'd greatly appreciate any advice someone could provide.

Regards,

Michael

output.seq
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to