hi I test spark streaming and kafka,the applicaion like this:import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import net.sf.json.JSONObject
/** * Created by asus on 2015/7/16. */ object KafkaEventAnalytics { def main (args: Array[String]) :Unit = { val conf = new SparkConf().setAppName("UserClickCountStat") conf.setMaster("spark://Master:7077").set("spark.driver.host","192.168.94.1").set("spark.cores.max","1") conf.setJars(List("D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\sparklearning.jar", "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\spark-streaming-kafka_2.10-1.4.0.jar", "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\kafka_2.10-0.8.2.1.jar", "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\json-lib-2.4.jar", "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\ezmorph-1.0.6.jar")) val ssc = new StreamingContext(conf, Seconds(20)) // Kafka configurations val topics = Set("user_events") val brokers = "Master:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers)//, "serializer.class" -> "kafka.serializer.StringEncoder") val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics) val events = kafkaStream.flatMap(line => { val data = JSONObject.fromObject(line._2) println(line._1) //println(data) Some(data) }) ssc.start() ssc.awaitTermination() } } when run this application,Exception has thrown as that,can somebody give some suggestion? 15/07/19 17:27:00 INFO DAGScheduler: Job 2 failed: foreachRDD at KafkaEventAnalytics.scala:59, took 0.182201 s 15/07/19 17:27:00 ERROR JobScheduler: Error running job streaming job 1437298020000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 11, 192.168.94.137): org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:72) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/07/19 17:27:20 INFO JobScheduler: Added jobs for time 1437298040000 ms 15/07/19 17:27:20 INFO SparkContext: Starting job: foreachRDD at KafkaEventAnalytics.scala:59 15/07/19 17:27:20 INFO JobScheduler: Starting job streaming job 1437298040000 ms.0 from job set of time 1437298040000 ms 15/07/19 17:27:20 INFO DAGScheduler: Registering RDD 14 (map at KafkaEventAnalytics.scala:58) 15/07/19 17:27:20 INFO DAGScheduler: Got job 3 (foreachRDD at KafkaEventAnalytics.scala:59) with 2 output partitions (allowLocal=false) 15/07/19 17:27:20 INFO DAGScheduler: Final stage: ResultStage 7(foreachRDD at KafkaEventAnalytics.scala:59) 15/07/19 17:27:20 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6) 15/07/19 17:27:20 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 6) 15/07/19 17:27:20 INFO DAGScheduler: Submitting ShuffleMapStage 6 (MapPartitionsRDD[14] at map at KafkaEventAnalytics.scala:58), which has no missing parents 15/07/19 17:27:20 INFO MemoryStore: ensureFreeSpace(3696) called with curMem=17503, maxMem=1016667832 15/07/19 17:27:20 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.6 KB, free 969.5 MB) 15/07/19 17:27:20 INFO MemoryStore: ensureFreeSpace(2141) called with curMem=21199, maxMem=1016667832 15/07/19 17:27:20 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.1 KB, free 969.5 MB) 15/07/19 17:27:20 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.17.1:51847 (size: 2.1 KB, free: 969.6 MB) 15/07/19 17:27:20 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874 15/07/19 17:27:20 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 6 (MapPartitionsRDD[14] at map at KafkaEventAnalytics.scala:58) 15/07/19 17:27:20 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks 15/07/19 17:27:20 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 12, 192.168.94.137, ANY, 1584 bytes) 15/07/19 17:27:20 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.94.137:37251 (size: 2.1 KB, free: 267.3 MB) 15/07/19 17:27:20 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12, 192.168.94.137): org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:72) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)