Hi, you register some output actions (in this case foreachRDD) after starting the streaming context. StreamingContext.start() has to be called after all! output actions.
2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>: > Hi, i am receiving this exception when direct spark streaming process > tries to pull data from kafka topic: > > 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time 1464168630000 > ms saved to file > 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000', > took 5928 bytes and 8 ms > > 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1041 > bytes result sent to driver > 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) > in 4 ms on localhost (1/1) > 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks > have all completed, from pool > 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at > KafkaRDD.scala:98) finished in 0,004 s > 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at > KafkaRDD.scala:98, took 0,008740 s > <------> > someMessages is [Lscala.Tuple2;@2641d687 > (null,{"userId":"someUserId","productId":"0981531679","rating":6.0}) > <------> > <---POSSIBLE SOLUTION---> > 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 1464168630000 > ms.0 from job set of time 1464168630000 ms > 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list > 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time > 1464168630000 ms (execution: 0,012 s) > 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job > 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, > transformations, and output operations after starting a context is not > supported > at* > org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) > at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) > at > org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) > at > org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) > at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) > at > example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) > at > example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105 > > > This is the code that rises the exception in the spark streaming process: > > try{ > messages.foreachRDD( rdd =>{ > val count = rdd.count() > if (count > 0){ > //someMessages should be AmazonRating... > val someMessages = rdd.take(count.toInt) > println("<------>") > println("someMessages is " + someMessages) > someMessages.foreach(println) > println("<------>") > println("<---POSSIBLE SOLUTION--->") > messages > .map { case (_, jsonRating) => > val jsValue = Json.parse(jsonRating) > AmazonRating.amazonRatingFormat.reads(jsValue) match { > case JsSuccess(rating, _) => rating > case JsError(_) => AmazonRating.empty > } > } > .filter(_ != AmazonRating.empty) > *//I think that this line provokes the runtime exception...* > * .foreachRDD(_.foreachPartition(it => > recommender.predictWithALS(it.toSeq)))* > > println("<---POSSIBLE SOLUTION--->") > > } > } > ) > }catch{ > case e: IllegalArgumentException => {println("illegal arg. > exception")}; > case e: IllegalStateException => {println("illegal state > exception")}; > case e: ClassCastException => {println("ClassCastException")}; > case e: Exception => {println(" Generic Exception")}; > }finally{ > > println("Finished taking data from kafka topic...") > } > > Recommender object: > > *def predictWithALS(ratings: Seq[AmazonRating])* = { > // train model > val myRatings = ratings.map(toSparkRating) > val myRatingRDD = sc.parallelize(myRatings) > > val startAls = DateTime.now > val model = ALS.train((sparkRatings ++ > myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) > > val myProducts = myRatings.map(_.product).toSet > val candidates = sc.parallelize((0 until > productDict.size).filterNot(myProducts.contains)) > > // get ratings of all products not in my history ordered by rating > (higher first) and only keep the first NumRecommendations > val myUserId = userDict.getIndex(MyUsername) > val recommendations = model.predict(candidates.map((myUserId, > _))).collect > val endAls = DateTime.now > val result = > recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) > val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds > > println(s"ALS Time: $alsTime seconds") > result > } > } > > And this is the kafka producer that push the json data within the topic: > > object AmazonProducerExample { > def main(args: Array[String]): Unit = { > > val productId = args(0).toString > val userId = args(1).toString > val rating = args(2).toDouble > val topicName = "amazonRatingsTopic" > > val producer = Producer[String](topicName) > > //0981531679 is Scala Puzzlers... > //AmazonProductAndRating > AmazonPageParser.parse(productId,userId,rating).onSuccess { case > amazonRating => > //Is this the correct way? the best performance? possibly not, what > about using avro or parquet? > producer.send(Json.toJson(amazonRating).toString) > //producer.send(amazonRating) > println("amazon product with rating sent to kafka cluster..." + > amazonRating.toString) > System.exit(0) > } > > } > } > > > I have written a stack overflow post > <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>, > with more details, please help, i am stuck with this issue and i don't know > how to continue. > > Regards > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> > > ------------------------------ > View this message in context: about an exception when receiving data from > kafka topic using Direct mode of Spark Streaming > <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet