In case of job being aborted does the application fail, if not then how do you 
make it fail?  How should streaming application recover without data loss?

How does this apply to zero data loss behavior for Kinesis receivers in Spark 
1.5?

Thanks,
Ashish 

-------- Original message --------
From: "Tathagata Das" <t...@databricks.com>
Date: Oct 14, 2015 1:28 PM
Subject: Re: Spark 1.5 java.net.ConnectException: Connection refused
To: "Spark Newbie" <sparknewbie1...@gmail.com>
Cc: "user" <user@spark.apache.org>, "Shixiong (Ryan) Zhu" 
<shixi...@databricks.com>

When a job gets aborted, it means that the internal tasks were retried a number 
of times before the system gave up. You can control the number retries (see 
Spark's configuration page). The job by default does not get resubmitted.

You could try getting the logs of the failed executor, to see what caused the 
failure. Could be a memory limit issue, and YARN killing it somehow. 



On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie <sparknewbie1...@gmail.com> 
wrote:
Is it slowing things down or blocking progress.
>> I didn't see slowing of processing, but I do see jobs aborted consecutively 
>> for a period of 18 batches (5 minute batch intervals). So I am worried about 
>> what happened to the records that these jobs were processing. 
Also, one more thing to mention is that the 
StreamingListenerBatchCompleted.numRecords information shows all received 
records as processed even if the batch/job failed. The processing time as well 
shows as the same time it takes for a successful batch.  
It seems like it is the numRecords which was the input to the batch regardless 
of whether they were successfully processed or not.

On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com> 
wrote:
I ran 2 different spark 1.5 clusters that have been running for more than a day 
now. I do see jobs getting aborted due to task retry's maxing out (default 4) 
due to ConnectionException. It seems like the executors die and get restarted 
and I was unable to find the root cause (same app code and conf used on spark 
1.4.1 I don't see ConnectionException). 

Another question related to this, what happens to the kinesis records received 
when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I am using) does 
the job gets resubmitted with the same received records? Or does the 
kinesis-asl library get those records again based on sequence numbers it 
tracks? It would good for me to understand the story around lossless processing 
of kinesis records in Spark-1.5 + kinesis-asl-1.5 when jobs are aborted. Any 
pointers or quick explanation would be very helpful.


On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> wrote:
Is this happening too often? Is it slowing things down or blocking progress. 
Failures once in a while is part of the norm, and the system should take care 
of itself.

On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com> wrote:
Hi Spark users,

I'm seeing the below exception in my spark streaming application. It happens in 
the first stage where the kinesis receivers receive records and perform a 
flatMap operation on the unioned Dstream. A coalesce step also happens as a 
part of that stage for optimizing the performance. 

This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I look 
at the executor logs I do not see any exceptions indicating the root cause of 
why there is no connectivity on xxx.xx.xx.xxx:36684 or when did that service go 
down. 

Any help debugging this problem will be helpful.

15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to 
ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
        at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
        at 
org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: 
ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684

Thanks,
Bharath





Reply via email to