Max Xu created SPARK-5220:
-----------------------------

             Summary: keepPushingBlocks in BlockGenerator terminated when an 
exception occurs, which causes the block pushing thread to terminate and blocks 
receiver  
                 Key: SPARK-5220
                 URL: https://issues.apache.org/jira/browse/SPARK-5220
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.2.0
            Reporter: Max Xu


I am running a Spark streaming application with ReliableKafkaReceiver. It uses 
BlockGenerator to push blocks to BlockManager. However, writing WALs to HDFS 
may time out that causes keepPushingBlocks in BlockGenerator to terminate.

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
        at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
        at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
        at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
        at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
        at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
        at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

Then the block pushing thread is done and no subsequent blocks can be pushed 
into blockManager. In turn this blocks receiver from receiving new data.

So when running my app and the TimeoutException happens, the 
ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. 
The application rogues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to