Yes, we did figure out this problem. And realised that instead sparkcontext I have to use mahoutsparkcontext,
On Sun, May 8, 2016 at 4:26 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > I think you have to create a SparkDistributedContext, which has Mahout > specific Kryo serialization and adds Mahout jars. If you let Mahout create > the Spark context it’s simpler > > val implicit mc = mahoutSparkContext(masterUr = “local", appName = > “SparkExample”) > > As I recall the sc will then be implicit through a conversion in the > Mahout Spark package but if I’m wrong and you still get errors create the > sc with > > val sc = sdc2sc(mc) > > This is from memory, your debugger may provide better help. I suspect the > error below is from not having kryo serialization classes configured > properly. > > On May 7, 2016, at 3:05 AM, Rohit Jain <rohitkjai...@gmail.com> wrote: > > hello everyone, > > I want to run Spark RowSimilarity recommender on data obtained from > mongodb. For this purpose, I've written below code which takes input from > mongo, converts it to RDD of Objects. This needs to be passed to > IndexedDataSetSpark which is then passed to SimilarityAnalysis. > rowSimilarityIDS. > > import org.apache.hadoop.conf.Configuration > import org.apache.mahout.math.cf.SimilarityAnalysis > import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark > import org.apache.spark.rdd.{NewHadoopRDD, RDD} > import org.apache.spark.{SparkConf, SparkContext} > import org.bson.BSONObject > import com.mongodb.hadoop.MongoInputFormat > > object SparkExample extends App { > val mongoConfig = new Configuration() > mongoConfig.set("mongo.input.uri", > "mongodb://my_mongo_ip:27017/db.collection") > > val sparkConf = new SparkConf() > val sc = new SparkContext("local", "SparkExample", sparkConf) > > val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD( > mongoConfig, > classOf[MongoInputFormat], > classOf[Object], > classOf[BSONObject] > ) > val new_doc: RDD[(String, String)] = documents.map( > doc1 => ( > doc1._2.get("product_id").toString(), > doc1._2.get("product_attribute_value").toString().replace("[ \"", > "").replace("\"]", "").split("\" , \"").map(value => > value.toLowerCase.replace(" ", "-")).mkString(" ") > ) > ) > var myIDs = IndexedDatasetSpark(new_doc)(sc) > > > > SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", > readWriteSchema) > > after runnning the code I am getiing this error: > java.io.NotSerializableException: org.apache.mahout.math.DenseVector > Serialization stack: > - object not serializable (class: org.apache.mahout.math.DenseVector, > value: {3:1.0,8:1.0,10:1.0}) > - field (class: scala.Some, name: x, type: class java.lang.Object) > - object (class scala.Some, Some({3:1.0,8:1.0,10:1.0})) > at > > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Please help me with this. > > > -- > Thanks & Regards, > > *Rohit Jain* > Web developer | Consultant > Mob +91 8097283931 > > -- Thanks & Regards, *Rohit Jain* Web developer | Consultant Mob +91 8097283931