If the timestamps in the logs are to be trusted It looks like your driver is dying with that *java.io.FileNotFoundException*: and therefore the workers loose their connection and close down.
-kr, Gerard. On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Try to add the following to the sparkConf > > .set("spark.core.connection.ack.wait.timeout","6000") > > .set("spark.akka.frameSize","60") > > Used to face that issue with spark 1.1.0 > > Thanks > Best Regards > > On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <bar...@chaordicsystems.com > > wrote: > >> Dear Spark'ers, >> >> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and >> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my >> job does the following: >> - Consumes a specific Kafka topic >> - Writes its content to S3 or HDFS >> >> Records in Kafka are in the form: >> {"key": "someString"} >> >> This is important because I use the value of "key" to define the output >> file name in S3. >> Here are the Spark and Kafka parameters I'm using: >> >> val sparkConf = new SparkConf() >>> .setAppName("MyDumperApp") >>> .set("spark.task.maxFailures", "100") >>> .set("spark.hadoop.validateOutputSpecs", "false") >>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") >>> .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops") >>> val kafkaParams = Map( >>> "zookeeper.connect" -> zkQuorum, >>> "zookeeper.session.timeout.ms" -> "10000", >>> "rebalance.backoff.ms" -> "8000", >>> "rebalance.max.retries" -> "10", >>> "group.id" -> group, >>> "auto.offset.reset" -> "largest" >>> ) >> >> >> My application is the following: >> >> KafkaUtils.createStream[String, String, StringDecoder, >>> StringDecoder](ssc, kafkaParams, Map(topic -> 1), >>> StorageLevel.MEMORY_AND_DISK_SER_2) >>> .foreachRDD((rdd, time) => >>> rdd.map { >>> case (_, line) => >>> val json = parse(line) >>> val key = extract(json, "key").getOrElse("key_not_found") >>> (key, dateFormatter.format(time.milliseconds)) -> line >>> } >>> .partitionBy(new HashPartitioner(10)) >>> .saveAsHadoopFile[KeyBasedOutput[(String,String), >>> String]]("s3://BUCKET", classOf[BZip2Codec]) >>> ) >> >> >> And the last piece: >> >> class KeyBasedOutput[T >: Null, V <: AnyRef] extends >>> MultipleTextOutputFormat[T , V] { >>> override protected def generateFileNameForKeyValue(key: T, value: V, >>> leaf: String) = key match { >>> case (myKey, batchId) => >>> "somedir" + "/" + myKey + "/" + >>> "prefix-" + myKey + "_" + batchId + "_" + leaf >>> } >>> override protected def generateActualKey(key: T, value: V) = null >>> } >> >> >> I use batch sizes of 5 minutes with checkpoints activated. >> The job fails nondeterministically (I think it never ran longer than ~5 >> hours). I have no clue why, it simply fails. >> Please find below the exceptions thrown by my application. >> >> I really appreciate any kind of hint. >> Thank you very much in advance. >> >> Regards, >> -- Flávio >> >> ==== Executor 1 >> >> 2014-12-10 19:05:15,150 INFO [handle-read-write-executor-3] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) >> 2014-12-10 19:05:15,201 INFO [Thread-6] storage.MemoryStore >> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with >> curMem=194463488, >> maxMem=4445479895 >> 2014-12-10 19:05:15,202 INFO [Thread-6] storage.MemoryStore >> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes >> in memor >> y (estimated size 96.4 KB, free 4.0 GB) >> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> ReceivingConnecti >> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) >> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-1] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) >> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) >> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> ReceivingConnecti >> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) >> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-3] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) >> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) >> 2014-12-10 19:05:16,650 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? >> sun.nio.ch.Se >> lectionKeyImpl@da2e041 >> 2014-12-10 19:05:16,651 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(80)) - key already >> cancelled ? sun.n >> io.ch.SelectionKeyImpl@da2e041 >> *java.nio.channels.CancelledKeyException* >> at >> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316) >> at >> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) >> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> ReceivingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,39444) >> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-2] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,39444) >> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,39444) >> 2014-12-10 19:05:16,680 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? >> sun.nio.ch.SelectionKeyImpl@6a0dd98a >> 2014-12-10 19:05:16,681 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(80)) - key already >> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a >> *java.nio.channels.CancelledKeyException* >> at >> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) >> at >> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) >> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> ReceivingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,57984) >> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-0] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,57984) >> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection to ConnectionManagerId( >> ec2-EXECUTOR.compute-1.amazonaws.com,57984) >> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler] >> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) - >> RECEIVED SIGNAL 15: SIGTERM >> >> ==== Executor 2 >> >> 2014-12-10 19:05:15,010 INFO [handle-message-executor-11] >> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of >> block input >> -0-1418238314800 >> 2014-12-10 19:05:15,157 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? >> sun.nio.ch.Se >> lectionKeyImpl@66ea19c >> 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-2] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> SendingConnection >> to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) >> 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-0] >> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing >> ReceivingConnecti >> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) >> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0] >> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding >> SendingConn >> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) >> not found >> 2014-12-10 19:05:15,158 INFO [connection-manager-thread] >> network.ConnectionManager (Logging.scala:logInfo(80)) - key already >> cancelled ? sun.n >> io.ch.SelectionKeyImpl@66ea19c >> *java.nio.channels.CancelledKeyException* >> at >> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) >> at >> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) >> >> ==== Driver >> >> 2014-12-10 19:05:13,805 INFO >> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Added input >> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444 >> (size: 38.2 KB, free: 4.1 GB) >> 2014-12-10 19:05:13,823 ERROR >> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler >> (Logging.scala:logError(96)) - Error runnin >> g job streaming job 1418238300000 ms.0 >> *java.io.FileNotFoundException*: File >> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist. >> at >> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) >> at >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) >> at >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) >> at >> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) >> at >> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803) >> at >> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100) >> at >> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 2014-12-10 19:05:13,829 INFO [Driver] yarn.ApplicationMaster >> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED >> >> >> *--Flávio R. Santos* >> >> Chaordic | *Platform* >> *www.chaordic.com.br <http://www.chaordic.com.br/>* >> +55 48 3232.3200 >> > >