One thing I noticed around the place where you get the first error --
you are calling words.map instead of words.mapToPair. map produces
JavaRDD<R> whereas mapToPair gives you a JavaPairRDD. I don't use the
Java APIs myself but it looks to me like you need to check the types
more carefully.

On Mon, Jun 23, 2014 at 2:15 PM, Alonso Isidoro Roman
<alons...@gmail.com> wrote:
> Hi all,
>
> I am new to Spark, so this is probably a basic question. i want to explore
> the possibilities of this fw, concretely using it in conjunction with 3
> party libs, like mongodb, for example.
>
> I have been keeping instructions from
> http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to
> connect spark with mongodb. This example is made with
> spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with
> latest versions.
>
> This is my pom.xml
>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>
> <groupId>com.aironman.spark</groupId>
>
> <artifactId>simple-project</artifactId>
>
> <modelVersion>4.0.0</modelVersion>
>
> <name>Simple Project</name>
>
> <packaging>jar</packaging>
>
> <version>1.0</version>
>
> <repositories>
>
> <repository>
>
> <id>Akka repository</id>
>
> <url>http://repo.akka.io/releases</url>
>
> </repository>
>
> </repositories>
>
> <dependencies>
>
> <dependency> <!-- Spark dependency -->
>
> <groupId>org.apache.spark</groupId>
>
> <artifactId>spark-core_2.10</artifactId>
>
> <version>1.0.0</version>
>
> </dependency>
>
>
> <dependency>
>
> <groupId>org.mongodb</groupId>
>
> <artifactId>mongo-hadoop-core</artifactId>
>
> <version>1.0.0</version>
>
> </dependency>
>
>
> </dependencies>
>
> </project>
>
> As you can see, super simple pom.xml
>
> And this is the JavaWordCount.java
>
> import java.util.Arrays;
>
> import java.util.Collections;
>
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.spark.api.java.JavaPairRDD;
>
> import org.apache.spark.api.java.JavaRDD;
>
> import org.apache.spark.api.java.JavaSparkContext;
>
> import org.apache.spark.api.java.function.FlatMapFunction;
>
> import org.apache.spark.api.java.function.Function2;
>
> import org.apache.spark.api.java.function.PairFunction;
>
> import org.bson.BSONObject;
>
> import org.bson.BasicBSONObject;
>
>
> import scala.Tuple2;
>
>
> import com.mongodb.hadoop.MongoOutputFormat;
>
>
> /***
>
>  * Esta clase se supone que se conecta a un cluster mongodb para ejecutar
> una tarea word count por cada palabra almacenada en la bd.
>
>  * el problema es que esta api esta rota, creo. Estoy usando la ultima
> version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un
> ejemplo
>
>  * sobre spark-streaming conectandose a un base mongodb, o usar
> spark-streaming junto con spring integration, es decir, conectar spark con
> un servicio web que
>
>  * periodicamente alimentaria spark...
>
>  * @author aironman
>
>  *
>
>  */
>
> public class JavaWordCount {
>
>
>
>     public static void main(String[] args) {
>
>
>
>         JavaSparkContext sc = new JavaSparkContext("local", "Java Word
> Count");
>
>
>
>         Configuration config = new Configuration();
>
>         config.set("mongo.input.uri",
> "mongodb:127.0.0.1:27017/beowulf.input");
>
>         config.set("mongo.output.uri",
> "mongodb:127.0.0.1:27017/beowulf.output");
>
>
>
>
>
>         JavaPairRDD<Object, BSONObject> mongoRDD =
> sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class,
> Object.class, BSONObject.class);
>
>
>
> //         Input contains tuples of (ObjectId, BSONObject)
>
>         JavaRDD<String> words = mongoRDD.flatMap(new
> FlatMapFunction<Tuple2<Object, BSONObject>, String>() {
>
>             @Override
>
>             public Iterable<String> call(Tuple2<Object, BSONObject> arg) {
>
>                 Object o = arg._2.get("text");
>
>                 if (o instanceof String) {
>
>                     String str = (String) o;
>
>                     str = str.toLowerCase().replaceAll("[.,!?\n]", " ");
>
>                     return Arrays.asList(str.split(" "));
>
>                 } else {
>
>                     return Collections.emptyList();
>
>                 }
>
>             }
>
>         });
>
>         //here is an error, The method map(Function<String,R>) in the type
> JavaRDD<String> is not applicable for the arguments (new
> PairFunction<String,String,Integer>(){})
>
>         JavaPairRDD<String, Integer> ones = words.map(new
> PairFunction<String, String, Integer>() {
>
>             public Tuple2<String, Integer> call(String s) {
>
>                 return new Tuple2<>(s, 1);
>
>             }
>
>         });
>
>         JavaPairRDD<String, Integer> counts = ones.reduceByKey(new
> Function2<Integer, Integer, Integer>() {
>
>             public Integer call(Integer i1, Integer i2) {
>
>                 return i1 + i2;
>
>             }
>
>         });
>
>
>
>         //another error, The method map(Function<Tuple2<String,Integer>,R>)
> in the type JavaPairRDD<String,Integer> is not applicable for the arguments
> (new    //PairFunction<Tuple2<String,Integer>,Object,BSONObject>(){})
>
>
> //         Output contains tuples of (null, BSONObject) - ObjectId will be
> generated by Mongo driver if null
>
>         JavaPairRDD<Object, BSONObject> save = counts.map(new
> PairFunction<Tuple2<String, Integer>, Object, BSONObject>() {
>
>             @Override
>
>             public Tuple2<Object, BSONObject> call(Tuple2<String, Integer>
> tuple) {
>
>                 BSONObject bson = new BasicBSONObject();
>
>                 bson.put("word", tuple._1);
>
>                 bson.put("count", tuple._2);
>
>                 return new Tuple2<>(null, bson);
>
>             }
>
>         });
>
>
>
> //         Only MongoOutputFormat and config are relevant
>
>         save.saveAsNewAPIHadoopFile("file:/bogus", Object.class,
> Object.class, MongoOutputFormat.class, config);
>
>     }
>
>
>
> }
>
>
> It looks like jar hell dependency, isn't it?  can anyone guide or help me?
>
> Another thing, i don t like closures, is it possible to use this fw without
> using it?
> Another question, are this objects, JavaSparkContext sc, JavaPairRDD<Object,
> BSONObject> mongoRDD ThreadSafe? Can i use them as singleton?
>
> Thank you very much and apologizes if the questions are not trending topic
> :)
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>

Reply via email to