unsubscribe

On Wed, Aug 1, 2018 at 6:54 AM Jaume Galí <jg...@konodrac.com> wrote:

> Hi everybody, I'm trying to build a basic recomender with Spark and Mahout
> on Scala. I use the follow mahout repo to compile mahout with scala 2.11
> and spark 2.1.2 mahout_fork <
> https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
> To execute my code I use spark-submit and it run fine when I put --master
>  local but when I try to run on a cluster with --master
>  spark://vagrant-ubuntu-trusty-64:7077
> <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same error.
>
> Command (Run Fine):
>
> /opt/spark/bin/spark-submit \
> --class 'com.reco.GenerateIndicator' \
> --name recomender \
> --master local \
> target/scala-2.11/recomender-0.0.1.jar
> Command (ERROR):
>
> /opt/spark/bin/spark-submit \
> --class 'com.reco.GenerateIndicator' \
> --name recomender \
> --master spark <spark://vagrant-ubuntu-trusty-64:7077>:
> <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
> <spark://vagrant-ubuntu-trusty-64:7077> \
> target/scala-2.11/recomender-0.0.1.jar
> Dependencies on Build.sbt :
>
> name := "recomender"
> version := "0.0.1"
> scalaVersion := "2.11.11"
> val mahoutVersion = "0.13.0"
> val sparkVersion = "2.1.2"
>
> libraryDependencies ++= {
>   Seq(
>     "org.apache.spark"        %% "spark-core" % sparkVersion % "provided" ,
>     "org.apache.spark"        %% "spark-sql" % sparkVersion % "provided" ,
>     "org.apache.spark"        %% "spark-mllib" % sparkVersion % "provided",
>     /* Mahout */
>     "org.apache.mahout" %% "mahout-spark" % mahoutVersion
>       exclude("org.apache.spark", "spark-core_2.11")
>       exclude("org.apache.spark", "spark-sql_2.11"),
>     "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
>     "org.apache.mahout" % "mahout-math" % mahoutVersion,
>     "org.apache.mahout" % "mahout-hdfs" % mahoutVersion
>       exclude("com.thoughtworks.xstream", "xstream")
>       exclude("org.apache.hadoop", "hadoop-client")
>   )
> }
>
> resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
> resolvers += Resolver.mavenLocal
>
> …
> Main class:
>
> package com.reco
>
> import org.apache.mahout.sparkbindings.SparkDistributedContext
> import org.apache.mahout.sparkbindings._
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
>
> object GenerateIndicator {
>
>   def main(args: Array[String]) {
>     try {
>
>       // Create spark-conf
>       val sparkConf = new SparkConf().setAppName("recomender")
>
>       implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
>         masterUrl = sparkConf.get("spark.master"),
>         appName = "recomender",
>         sparkConf = sparkConf,
>         // addMahoutJars = true,
>         addMahoutJars = false
>       )
>
>       implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
>
>       val sparkSession = SparkSession
>         .builder()
>         .appName("recomender")
>         .config(sparkConf)
>         .getOrCreate()
>
>       val lines = returnData()
>
>       val linesRdd = sdc.sc.parallelize(lines)
>
>       println("...Collecting...")
>
>       linesRdd.collect().foreach( item => {  // ERROR HERE! on collect()
>         println(item)
>       })
>
>       // Destroy Spark Session
>       sparkSession.stop()
>       sparkSession.close()
>
>     } catch {
>       case e: Exception =>
>         println(e)
>         throw new Exception(e)
>
>     }
>
>   }
>
>   def returnData() : Array[String] = {
>     val lines = Array(
>       "17,Action",
>       "17,Comedy",
>       "17,Crime",
>       "17,Horror",
>       "17,Thriller",
>       "12,Crime",
>       "12,Thriller",
>       "16,Comedy",
>       "16,Romance",
>       "20,Drama",
>       "20,Romance",
>       "7,Drama",
>       "7,Sci-Fi",
>       // ... more lines in array ...
>       "1680,Drama",
>       "1680,Romance",
>       "1681,Comedy"
>     )
>     lines
>   }
>
> }
> Error::
>
> 18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
> GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
> stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
> Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0):
> java.lang.IllegalStateException: unread block data
>     at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> 18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
> on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
> data) [duplicate 7]
> 18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
> GenerateIndicator.scala:38, took 5.265593 s
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread
> block data
>     at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Thanks a lot for your time.
> Cheers.

Reply via email to