Dear Spark'ers, I'm trying to run a simple job using Spark Streaming (version 1.1.1) and YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my job does the following: - Consumes a specific Kafka topic - Writes its content to S3 or HDFS
Records in Kafka are in the form: {"key": "someString"} This is important because I use the value of "key" to define the output file name in S3. Here are the Spark and Kafka parameters I'm using: val sparkConf = new SparkConf() > .setAppName("MyDumperApp") > .set("spark.task.maxFailures", "100") > .set("spark.hadoop.validateOutputSpecs", "false") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops") > val kafkaParams = Map( > "zookeeper.connect" -> zkQuorum, > "zookeeper.session.timeout.ms" -> "10000", > "rebalance.backoff.ms" -> "8000", > "rebalance.max.retries" -> "10", > "group.id" -> group, > "auto.offset.reset" -> "largest" > ) My application is the following: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, > kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2) > .foreachRDD((rdd, time) => > rdd.map { > case (_, line) => > val json = parse(line) > val key = extract(json, "key").getOrElse("key_not_found") > (key, dateFormatter.format(time.milliseconds)) -> line > } > .partitionBy(new HashPartitioner(10)) > .saveAsHadoopFile[KeyBasedOutput[(String,String), > String]]("s3://BUCKET", classOf[BZip2Codec]) > ) And the last piece: class KeyBasedOutput[T >: Null, V <: AnyRef] extends > MultipleTextOutputFormat[T , V] { > override protected def generateFileNameForKeyValue(key: T, value: V, > leaf: String) = key match { > case (myKey, batchId) => > "somedir" + "/" + myKey + "/" + > "prefix-" + myKey + "_" + batchId + "_" + leaf > } > override protected def generateActualKey(key: T, value: V) = null > } I use batch sizes of 5 minutes with checkpoints activated. The job fails nondeterministically (I think it never ran longer than ~5 hours). I have no clue why, it simply fails. Please find below the exceptions thrown by my application. I really appreciate any kind of hint. Thank you very much in advance. Regards, -- Flávio ==== Executor 1 2014-12-10 19:05:15,150 INFO [handle-read-write-executor-3] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) 2014-12-10 19:05:15,201 INFO [Thread-6] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with curMem=194463488, maxMem=4445479895 2014-12-10 19:05:15,202 INFO [Thread-6] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in memor y (estimated size 96.4 KB, free 4.0 GB) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnecti on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-1] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnecti on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-3] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) 2014-12-10 19:05:16,650 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? sun.nio.ch.Se lectionKeyImpl@da2e041 2014-12-10 19:05:16,651 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(80)) - key already cancelled ? sun.n io.ch.SelectionKeyImpl@da2e041 *java.nio.channels.CancelledKeyException* at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,39444) 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,39444) 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,39444) 2014-12-10 19:05:16,680 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? sun.nio.ch.SelectionKeyImpl@6a0dd98a 2014-12-10 19:05:16,681 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(80)) - key already cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a *java.nio.channels.CancelledKeyException* at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,57984) 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,57984) 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId( ec2-EXECUTOR.compute-1.amazonaws.com,57984) 2014-12-10 19:05:17,182 ERROR [SIGTERM handler] executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) - RECEIVED SIGNAL 15: SIGTERM ==== Executor 2 2014-12-10 19:05:15,010 INFO [handle-message-executor-11] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input -0-1418238314800 2014-12-10 19:05:15,157 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? sun.nio.ch.Se lectionKeyImpl@66ea19c 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-2] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing SendingConnection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logInfo(59)) - Removing ReceivingConnecti on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0] network.ConnectionManager (Logging.scala:logError(75)) - Corresponding SendingConn ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) not found 2014-12-10 19:05:15,158 INFO [connection-manager-thread] network.ConnectionManager (Logging.scala:logInfo(80)) - key already cancelled ? sun.n io.ch.SelectionKeyImpl@66ea19c *java.nio.channels.CancelledKeyException* at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) ==== Driver 2014-12-10 19:05:13,805 INFO [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added input -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444 (size: 38.2 KB, free: 4.1 GB) 2014-12-10 19:05:13,823 ERROR [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler (Logging.scala:logError(96)) - Error runnin g job streaming job 1418238300000 ms.0 *java.io.FileNotFoundException*: File s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist. at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803) at MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100) at MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2014-12-10 19:05:13,829 INFO [Driver] yarn.ApplicationMaster (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED *--Flávio R. Santos* Chaordic | *Platform* *www.chaordic.com.br <http://www.chaordic.com.br/>* +55 48 3232.3200