Matei and Andrew,
Thank you both for your prompt responses. Matei is correct in that I am
attempting to cache a large RDD for repeated query.
I was able to implement your suggestion in a Scala version of the code,
which I've copied below. I should point out two minor details:
LongWritable.clone() is a private method and both the key and value need to
be "cloned" in order for the data to be cached correctly.
My attempt at a Java version wasn't as successful. If you don't mind, could
you please suggest a better way if it currently exists? This is mostly
educational since I already have a working version in Scala. I'm new to
both.
Regards,
Mike
Java:
public class App
{
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: SynthesisService <master> <input file>
<jar file>");
System.exit(1);
}
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
JavaSparkContext ctx = new JavaSparkContext(args[0],
"SynthesisService",
"~/spark-0.8.0-incubating",args[2]);
//Load DataCube via Spark sequenceFile
JavaPairRDD<LongWritable,LongWritable> temp_DataCube =
ctx.sequenceFile(args[1],
LongWritable.class, LongWritable.class);
JavaRDD<Tuple2<LongWritable,LongWritable>> DataCube;
DataCube = temp_DataCube.map(
new
Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>> ()
{
@Override
public Tuple2<LongWritable,LongWritable>
call(LongWritable key, LongWritable value) {
return (new Tuple2(new LongWritable(key.get()),
value));
}
});
-----
COMPILATION ERROR :
-------------------------------------------------------------
spark/synthesis/service/Init/App.java:[51,32] error: no suitable method
found for map(<anonymous
Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>>>)
1 error
Scala:
package testspark
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoRegistrator
import org.apache.hadoop.io.LongWritable
import com.esotericsoftware.kryo.Kryo
class MyRegistrator extends KryoRegistrator{
def registerClasses(kryo: Kryo){
kryo.register(classOf[LongWritable]);
kryo.register(classOf[Tuple2[LongWritable,LongWritable]]);
}
}
object ScalaSynthesisServer {
def pseudoClone(x: LongWritable, y: LongWritable):
(LongWritable,LongWritable) = {
return new Tuple2(new LongWritable(x.get()) , new
LongWritable(y.get()))
}
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: ScalaSynthesisServer
<master> <input file>
<jar file>")
System.exit(1)
}
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator","testspark.MyRegistrator")
val sc = new SparkContext(args(0),
"ScalaSynthesisServer","~/spark-0.8.0-incubating",List(args(2)))
val DataCube = sc.sequenceFile(args(1), classOf[LongWritable],
classOf[LongWritable]).map(a => pseudoClone(a._1,a._2))
DataCube.cache()
val list = DataCube.collect();
var x = 0;
for( x <- list ){
println("Key= " + x._1 + " Value= " + x._2);
}
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353p552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.