Was your intention that exception from rdd.saveToCassandra() be caught ? In that case you can place try / catch around that call.
Cheers On Fri, Sep 11, 2015 at 7:30 AM, Samya <samya.ma...@amadeus.com> wrote: > Hi Team, > > I am facing this issue where in I can't figure out why the exception is > handled the first time an exception is thrown in the stream processing > action, but is ignored the second time. > > PFB my code base. > > > > object Boot extends App { > > //Load the configuration > val config = LoadConfig.getConfig > > val checkpointDirectory = > config.getString("nexti.spark.checkpointDirectory") > var ssc: StreamingContext = null > try{ > val sparkConf = new SparkConf() > .setAppName(Boot.config.getString("nexti.spark.appNme")) > .setMaster(Boot.config.getString("nexti.spark.master")) > .set("spark.cassandra.connection.host", > Boot.config.getString("nexti.spark.cassandra.connection.host")) > > > .set("spark.cassandra.query.retry.count",Boot.config.getString("nexti.spark.cassandra.query.retry.count")) > > val ssc = new StreamingContext(sparkConf, Seconds(30)) > > val msgStream = SparkKafkaDirectAPI.getStream(ssc) > > val wordCountPair = msgStream.map(_._2).flatMap(_.split(" ")).map(x => > (x, 1L)).reduceByKey(_ + _) > > wordCountPair.foreachRDD(rdd => > rdd.saveToCassandra("nexti","direct_api_test",AllColumns)) > > ssc.start() > ssc.awaitTermination() > } > catch { > case ex: Exception =>{ > println(">>>>>>>> Exception UNKNOWN Only.............") > } > } > } > > > I am sure that missing out on something, please provide your inputs. > > Regards, > Sam > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Handling-Spark-Streaming-tp24658.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >