I think you have to create a SparkDistributedContext, which has Mahout specific 
Kryo serialization and adds Mahout jars. If you let Mahout create the Spark 
context it’s simpler

   val implicit mc = mahoutSparkContext(masterUr = “local", appName = 
“SparkExample”)

As I recall the sc will then be implicit through a conversion in the Mahout 
Spark package but if I’m wrong and you still get errors create the sc with

  val sc = sdc2sc(mc)

This is from memory, your debugger may provide better help. I suspect the error 
below is from not having kryo serialization classes configured properly.

On May 7, 2016, at 3:05 AM, Rohit Jain <rohitkjai...@gmail.com> wrote:

hello everyone,

I want to run Spark RowSimilarity recommender on data obtained from
mongodb. For this purpose, I've written below code which takes input from
mongo, converts it to RDD of Objects. This needs to be passed to
IndexedDataSetSpark which is then passed to SimilarityAnalysis.
rowSimilarityIDS.

import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat

object SparkExample extends App {
 val mongoConfig = new Configuration()
 mongoConfig.set("mongo.input.uri",
"mongodb://my_mongo_ip:27017/db.collection")

 val sparkConf = new SparkConf()
 val sc = new SparkContext("local", "SparkExample", sparkConf)

 val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
   mongoConfig,
   classOf[MongoInputFormat],
   classOf[Object],
   classOf[BSONObject]
 )
 val new_doc: RDD[(String, String)] = documents.map(
   doc1 => (
   doc1._2.get("product_id").toString(),
   doc1._2.get("product_attribute_value").toString().replace("[ \"",
"").replace("\"]", "").split("\" , \"").map(value =>
value.toLowerCase.replace(" ", "-")).mkString(" ")
   )
 )
 var myIDs = IndexedDatasetSpark(new_doc)(sc)


SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile",
readWriteSchema)

after runnning the code I am getiing this error:
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector,
value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Please help me with this.


-- 
Thanks & Regards,

*Rohit Jain*
Web developer | Consultant
Mob +91 8097283931

Reply via email to