In case of job being aborted does the application fail, if not then how do you make it fail? How should streaming application recover without data loss?
How does this apply to zero data loss behavior for Kinesis receivers in Spark 1.5? Thanks, Ashish -------- Original message -------- From: "Tathagata Das" <t...@databricks.com> Date: Oct 14, 2015 1:28 PM Subject: Re: Spark 1.5 java.net.ConnectException: Connection refused To: "Spark Newbie" <sparknewbie1...@gmail.com> Cc: "user" <user@spark.apache.org>, "Shixiong (Ryan) Zhu" <shixi...@databricks.com> When a job gets aborted, it means that the internal tasks were retried a number of times before the system gave up. You can control the number retries (see Spark's configuration page). The job by default does not get resubmitted. You could try getting the logs of the failed executor, to see what caused the failure. Could be a memory limit issue, and YARN killing it somehow. On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie <sparknewbie1...@gmail.com> wrote: Is it slowing things down or blocking progress. >> I didn't see slowing of processing, but I do see jobs aborted consecutively >> for a period of 18 batches (5 minute batch intervals). So I am worried about >> what happened to the records that these jobs were processing. Also, one more thing to mention is that the StreamingListenerBatchCompleted.numRecords information shows all received records as processed even if the batch/job failed. The processing time as well shows as the same time it takes for a successful batch. It seems like it is the numRecords which was the input to the batch regardless of whether they were successfully processed or not. On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com> wrote: I ran 2 different spark 1.5 clusters that have been running for more than a day now. I do see jobs getting aborted due to task retry's maxing out (default 4) due to ConnectionException. It seems like the executors die and get restarted and I was unable to find the root cause (same app code and conf used on spark 1.4.1 I don't see ConnectionException). Another question related to this, what happens to the kinesis records received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I am using) does the job gets resubmitted with the same received records? Or does the kinesis-asl library get those records again based on sequence numbers it tracks? It would good for me to understand the story around lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when jobs are aborted. Any pointers or quick explanation would be very helpful. On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> wrote: Is this happening too often? Is it slowing things down or blocking progress. Failures once in a while is part of the norm, and the system should take care of itself. On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com> wrote: Hi Spark users, I'm seeing the below exception in my spark streaming application. It happens in the first stage where the kinesis receivers receive records and perform a flatMap operation on the unioned Dstream. A coalesce step also happens as a part of that stage for optimizing the performance. This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I look at the executor logs I do not see any exceptions indicating the root cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did that service go down. Any help debugging this problem will be helpful. 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused: ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 Thanks, Bharath