Dear Spark'ers,

I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
job does the following:
- Consumes a specific Kafka topic
- Writes its content to S3 or HDFS

Records in Kafka are in the form:
{"key": "someString"}

This is important because I use the value of "key" to define the output
file name in S3.
Here are the Spark and Kafka parameters I'm using:

val sparkConf = new SparkConf()
>   .setAppName("MyDumperApp")
>   .set("spark.task.maxFailures", "100")
>   .set("spark.hadoop.validateOutputSpecs", "false")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
> val kafkaParams = Map(
>   "zookeeper.connect" -> zkQuorum,
>   "zookeeper.session.timeout.ms" -> "10000",
>   "rebalance.backoff.ms" -> "8000",
>   "rebalance.max.retries" -> "10",
>   "group.id" -> group,
>   "auto.offset.reset" -> "largest"
> )


My application is the following:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
> kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2)
>   .foreachRDD((rdd, time) =>
>     rdd.map {
>       case (_, line) =>
>         val json = parse(line)
>         val key = extract(json, "key").getOrElse("key_not_found")
>         (key, dateFormatter.format(time.milliseconds)) -> line
>     }
>       .partitionBy(new HashPartitioner(10))
>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
> String]]("s3://BUCKET", classOf[BZip2Codec])
>   )


And the last piece:

class KeyBasedOutput[T >: Null, V <: AnyRef] extends
> MultipleTextOutputFormat[T , V] {
>   override protected def generateFileNameForKeyValue(key: T, value: V,
> leaf: String) = key match {
>     case (myKey, batchId) =>
>       "somedir" + "/" + myKey + "/" +
>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>   }
>   override protected def generateActualKey(key: T, value: V) = null
> }


I use batch sizes of 5 minutes with checkpoints activated.
The job fails nondeterministically (I think it never ran longer than ~5
hours). I have no clue why, it simply fails.
Please find below the exceptions thrown by my application.

I really appreciate any kind of hint.
Thank you very much in advance.

Regards,
-- Flávio

==== Executor 1

2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
(Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
curMem=194463488,
 maxMem=4445479895
2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
(Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
in memor
y (estimated size 96.4 KB, free 4.0 GB)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.Se
lectionKeyImpl@da2e041
2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.n
io.ch.SelectionKeyImpl@da2e041
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.SelectionKeyImpl@6a0dd98a
2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
RECEIVED SIGNAL 15: SIGTERM

==== Executor 2

2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
block input
-0-1418238314800
2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.Se
lectionKeyImpl@66ea19c
2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
SendingConn
ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) not
found
2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.n
io.ch.SelectionKeyImpl@66ea19c
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)

==== Driver

2014-12-10 19:05:13,805 INFO
 [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Added input
-0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
(size: 38.2 KB, free: 4.1 GB)
2014-12-10 19:05:13,823 ERROR
[sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
(Logging.scala:logError(96)) - Error runnin
g job streaming job 1418238300000 ms.0
*java.io.FileNotFoundException*: File
s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
        at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
        at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
        at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
        at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
        at
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
        at
MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
        at
MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
(Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br <http://www.chaordic.com.br/>*
+55 48 3232.3200

Reply via email to