I am running spark locally to understand how countByValueAndWindow works
val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1") // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]") sparkConf.set("spark.task.maxFailures","1") val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1 ssc.checkpoint("D:\\SparkCheckPointDirectory") // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2.toInt) val keyValuelines = lines.map { x => (x, 1) } val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1)) //window,interval // val windowedlines = lines.reduceByWindow((x, y) => { x + y }, Seconds(4) , Seconds(2)) windowedlines.print() ssc.start() ssc.awaitTermination() everything works file till numeric data is supplied on the kafka topic as I am using toInt ,when a blank string "" is written on kafka topic it fails complaining NumberFormatExceotion that is OK,but the problem is it is retrying this indefinetly again and again and complaining the same NumberFormatException Is there any way to control number of time spark will try to convert string to Int ,like Spark should try it only [times] and then move to next batch of data Note - I am using Spark 1.4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-retrying-task-indefinietly-tp25022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org