Hi Manas

I saw a very similar problem while using mapWithState. Timeout on BlockManager 
remove leading to a stall.

In my case it only occurred when there was a big backlog of micro-batches, 
combined with a shortage of memory. The adding and removing of blocks between 
new and old tasks was interleaved.  Don’t really know what caused it. Once I 
fixed the problems that were causing the backlog – in my case state compaction 
not working with Kryo in 1.6.0 (with Kryo workaround rather than patch) – I’ve 
never seen it again.

So if you’ve got a backlog or other issue to fix maybe you’ll get lucky too ☺.

Cheers
Iain

From: manas kar [mailto:poorinsp...@gmail.com]
Sent: 15 March 2016 14:49
To: Ted Yu
Cc: user
Subject: [MARKETING] Re: mapwithstate Hangs with Error cleaning broadcast

I am using spark 1.6.
I am not using any broadcast variable.
This broadcast variable is probably used by the state management of mapwithState

...Manas

On Tue, Mar 15, 2016 at 10:40 AM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:
Which version of Spark are you using ?

Can you show the code snippet w.r.t. broadcast variable ?

Thanks

On Tue, Mar 15, 2016 at 6:04 AM, manasdebashiskar 
<poorinsp...@gmail.com<mailto:poorinsp...@gmail.com>> wrote:
Hi,
 I have a streaming application that takes data from a kafka topic and uses
mapwithstate.
 After couple of hours of smooth running of the application I see a problem
that seems to have stalled my application.
The batch seems to have been stuck after the following error popped up.
Has anyone seen this error or know what causes it?
14/03/2016 21:41:13,295 ERROR org.apache.spark.ContextCleaner: 95 - Error
cleaning broadcast 7456
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
        at
org.apache.spark.rpc.RpcTimeout.org<http://org.apache.spark.rpc.RpcTimeout.org>$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
        at
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
        at
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
        at
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
        at
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
        at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)
        at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
        at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
        at
org.apache.spark.ContextCleaner.org<http://org.apache.spark.ContextCleaner.org>$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
        at
org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[120 seconds]
        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        ... 12 more




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapwithstate-Hangs-with-Error-cleaning-broadcast-tp26500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp

Reply via email to