My spark main thread create some daemon thread. Then the spark application 
throw some exceptions, and the main thread will quit. But the jvm of driver 
don't crash, so How can i do?
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
    def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
    def run() {
      try {
        System.out.println("runable")
      } catch {
        case e: Exception => {
          System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
        }
      }
    }
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
    System.out.println("end!!!!!")
    throw e
  }
}

Reply via email to