Re: about a JavaWordCount example with spark-core_2.10-1.0.0.jar
One thing I noticed around the place where you get the first error -- you are calling words.map instead of words.mapToPair. map produces JavaRDD whereas mapToPair gives you a JavaPairRDD. I don't use the Java APIs myself but it looks to me like you need to check the types more carefully. On Mon, Jun 23, 2014 at 2:15 PM, Alonso Isidoro Roman wrote: > Hi all, > > I am new to Spark, so this is probably a basic question. i want to explore > the possibilities of this fw, concretely using it in conjunction with 3 > party libs, like mongodb, for example. > > I have been keeping instructions from > http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to > connect spark with mongodb. This example is made with > spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with > latest versions. > > This is my pom.xml > > http://maven.apache.org/POM/4.0.0"; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd";> > > com.aironman.spark > > simple-project > > 4.0.0 > > Simple Project > > jar > > 1.0 > > > > > > Akka repository > > http://repo.akka.io/releases > > > > > > > > > > org.apache.spark > > spark-core_2.10 > > 1.0.0 > > > > > > > org.mongodb > > mongo-hadoop-core > > 1.0.0 > > > > > > > > > As you can see, super simple pom.xml > > And this is the JavaWordCount.java > > import java.util.Arrays; > > import java.util.Collections; > > > import org.apache.hadoop.conf.Configuration; > > import org.apache.spark.api.java.JavaPairRDD; > > import org.apache.spark.api.java.JavaRDD; > > import org.apache.spark.api.java.JavaSparkContext; > > import org.apache.spark.api.java.function.FlatMapFunction; > > import org.apache.spark.api.java.function.Function2; > > import org.apache.spark.api.java.function.PairFunction; > > import org.bson.BSONObject; > > import org.bson.BasicBSONObject; > > > import scala.Tuple2; > > > import com.mongodb.hadoop.MongoOutputFormat; > > > /*** > > * Esta clase se supone que se conecta a un cluster mongodb para ejecutar > una tarea word count por cada palabra almacenada en la bd. > > * el problema es que esta api esta rota, creo. Estoy usando la ultima > version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un > ejemplo > > * sobre spark-streaming conectandose a un base mongodb, o usar > spark-streaming junto con spring integration, es decir, conectar spark con > un servicio web que > > * periodicamente alimentaria spark... > > * @author aironman > > * > > */ > > public class JavaWordCount { > > > > public static void main(String[] args) { > > > > JavaSparkContext sc = new JavaSparkContext("local", "Java Word > Count"); > > > > Configuration config = new Configuration(); > > config.set("mongo.input.uri", > "mongodb:127.0.0.1:27017/beowulf.input"); > > config.set("mongo.output.uri", > "mongodb:127.0.0.1:27017/beowulf.output"); > > > > > > JavaPairRDD mongoRDD = > sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, > Object.class, BSONObject.class); > > > > // Input contains tuples of (ObjectId, BSONObject) > > JavaRDD words = mongoRDD.flatMap(new > FlatMapFunction, String>() { > > @Override > > public Iterable call(Tuple2 arg) { > > Object o = arg._2.get("text"); > > if (o instanceof String) { > > String str = (String) o; > > str = str.toLowerCase().replaceAll("[.,!?\n]", " "); > > return Arrays.asList(str.split(" ")); > > } else { > > return Collections.emptyList(); > > } > > } > > }); > > //here is an error, The method map(Function) in the type > JavaRDD is not applicable for the arguments (new > PairFunction(){}) > > JavaPairRDD ones = words.map(new > PairFunction() { > > public Tuple2 call(String s) { > > return new Tuple2<>(s, 1); > > } > > }); > > JavaPairRDD counts = ones.reduceByKey(new > Function2() { > > public Integer call(Integer i1, Integer i2) { > > return i1 + i2; > > } > > }); > > > > //another error, The method map(Function,R>) > in the type JavaPairRDD is not applicable for the arguments > (new//PairFunction,Object,BSONObject>(){}) > > > // Output contains tuples of (null, BSONObject) - ObjectId will be > generated by Mongo driver if null > > JavaPairRDD save = counts.map(new > PairFunction, Object, BSONObject>() { > > @Override > > public Tuple2 call(Tuple2 > tuple) { > > BSONObject bson = new BasicBSONObject(); > > bson.put("word", tuple._1); > > bson.put("count", tuple._2); > > return new Tuple2<>(null, bson); > >
about a JavaWordCount example with spark-core_2.10-1.0.0.jar
Hi all, I am new to Spark, so this is probably a basic question. i want to explore the possibilities of this fw, concretely using it in conjunction with 3 party libs, like mongodb, for example. I have been keeping instructions from http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to connect spark with mongodb. This example is made with spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with latest versions. This is my pom.xml http://maven.apache.org/POM/4.0.0"; xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> com.aironman.spark simple-project 4.0.0 Simple Project jar 1.0 Akka repository http://repo.akka.io/releases org.apache.spark spark-core_2.10 1.0.0 org.mongodb mongo-hadoop-core 1.0.0 As you can see, super simple pom.xml And this is the JavaWordCount.java import java.util.Arrays; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.bson.BSONObject; import org.bson.BasicBSONObject; import scala.Tuple2; import com.mongodb.hadoop.MongoOutputFormat; /*** * Esta clase se supone que se conecta a un cluster mongodb para ejecutar una tarea word count por cada palabra almacenada en la bd. * el problema es que esta api esta rota, creo. Estoy usando la ultima version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un ejemplo * sobre spark-streaming conectandose a un base mongodb, o usar spark-streaming junto con spring integration, es decir, conectar spark con un servicio web que * periodicamente alimentaria spark... * @author aironman * */ public class JavaWordCount { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext("local", "Java Word Count"); Configuration config = new Configuration(); config.set("mongo.input.uri", "mongodb:127.0.0.1:27017/beowulf.input "); config.set("mongo.output.uri", "mongodb: 127.0.0.1:27017/beowulf.output"); JavaPairRDD mongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class); // Input contains tuples of (ObjectId, BSONObject) JavaRDD words = mongoRDD.flatMap(new FlatMapFunction, String>() { @Override public Iterable call(Tuple2 arg) { Object o = arg._2.get("text"); if (o instanceof String) { String str = (String) o; str = str.toLowerCase().replaceAll("[.,!?\n]", " "); return Arrays.asList(str.split(" ")); } else { return Collections.emptyList(); } } }); *//here is an error, The method map(Function) in the type JavaRDD is not applicable for the arguments (new PairFunction(){})* JavaPairRDD ones = words.map(new PairFunction() { public Tuple2 call(String s) { return new Tuple2<>(s, 1); } }); JavaPairRDD counts = ones.reduceByKey(new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); *//another error, The method map(Function,R>) in the type JavaPairRDD is not applicable for the arguments (new //PairFunction,Object,BSONObject>(){})* // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null JavaPairRDD save = counts.map(new PairFunction, Object, BSONObject>() { @Override public Tuple2 call(Tuple2 tuple) { BSONObject bson = new BasicBSONObject(); bson.put("word", tuple._1); bson.put("count", tuple._2); return new Tuple2<>(null, bson); } }); // Only MongoOutputFormat and config are relevant save.saveAsNewAPIHadoopFile("file:/bogus", Object.class, Object. class, MongoOutputFormat.class, config); } } It looks like jar hell dependency, isn't it? can anyone guide or help me? Another thing, i don t like closures, is it possible to use this fw without using it? Another question, are this objects, JavaSparkContext sc, JavaPairRDD mongoRDD ThreadSafe? Can i use them as singleton? Thank you very much and apologizes if the questions are not trending topic :) Alonso Isidoro Roman. Mis citas preferidas (de hoy) : "Si depurar es el proceso de quitar los errores de software, entonces programar