PySpark Streaming “PicklingError: Could not serialize object” when use transform operator and checkpoint enabled
In PySpark streaming, if checkpoint enabled, and if use a stream.transform operator to join with another rdd, “PicklingError: Could not serialize object” will be thrown. I have asked the same question at stackoverflow: https://stackoverflow.com/questions/56267591/pyspark-streaming-picklingerror-could-not-serialize-object-when-checkpoint-an After some investigation, I found the problem is due to checkpoint will serialize lambda and then serialize the rdd in lambda. So I change the code to something like below, the purpose is to use a static transient variable to avoid serialize rdd. class DocInfoHolder: doc_info = None line.transform(lambda rdd:rdd.join(DocInfoHolder.doc_info)).pprint(10) But problem exist still. Then I found pyspark use a special pickle called cloudpickle.py, looks like it will serialize any reference class, function, lambda code, and there is no document about how to skip serialize. Could anyone help, how to walk around this issue. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark SQL met "Block broadcast_xxx not found"
Ok... I am sure it is a bug of spark, I found the bug code, but the code is removed in 2.2.3, so I just upgrade spark to fix the problem. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark SQL met "Block broadcast_xxx not found"
We met broadcast issue in some of our applications, but not every time we run application, usually it gone when we rerun it. In the exception log, I see below two types of exception: Exception 1: 10:09:20.295 [shuffle-server-6-2] ERROR org.apache.spark.network.server.TransportRequestHandler - Error opening block StreamChunkId{streamId=365584526097, chunkIndex=0} for request from /10.33.46.33:19866 org.apache.spark.storage.BlockNotFoundException: Block broadcast_334_piece0 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:361) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) ~[spark-core_2.11-2.2.1.jar:2.2.1] at scala.collection.Iterator$$anon$11.next(Iterator.scala:363) ~[scala-library-2.11.0.jar:?] at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) ~[scala-library-2.11.0.jar:?] at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:87) ~[spark-network-common_2.11-2.2.1.jar:2.2.1] at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125) [spark-network-common_2.11-2.2.1.jar:2.2.1] at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103) [spark-network-common_2.11-2.2.1.jar:2.2.1] at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) [spark-network-common_2.11-2.2.1.jar:2.2.1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) [netty-all-4.0.23.Final.jar:4.0.23.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) [netty-all-4.0.23.Final.jar:4.0.23.Final] Exception 2: 10:14:37.906 [Executor task launch worker for task 430478] ERROR org.apache.spark.util.Utils - Exception encountered org.apache.spark.SparkException: Failed to get broadcast_696_piece0 of broadcast_696 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:178) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:150) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:150) ~[spark-core_2.11-2.2.1.jar:2.2.1] at scala.collection.immutable.List.foreach(List.scala:383) ~[scala-library-2.11.0.jar:?] at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:150) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:222) ~[spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) [spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) [spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) [spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) [spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) [spark-core_2.11-2.2.1.jar:2.2.1] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [spark-core_2.11-2.2.1.jar:2.2.1] I think exception 2 is caused by exception 1, so the issue is when executor A try to get broadcast from executor B, executor B cannot find in local. It is strange, because broadcast is store in memory and disk, it should not be removed only when driver asked, but driver will remove broadcast only one broadcast variable not used anymore. Could anyone gives some cue on how to find the root cause of this issue? Thanks a lot! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Limit the block size of data received by spring streaming receiver
Hey, We use a customize receiver to receive data from our MQ. We used to use def store(dataItem: T) to store data however I found the block size can be very different from 0.5K to 5M size. So that data partition processing time is very different. Shuffle is an option, but I want to avoid it. I notice that def store(dataBuffer: ArrayBuffer[T]) can store the whole data into a block so I can control block size, however I also noticed that this method doesn't apply any rate limit on it, I have to do rate limit myself. So by now, I haven't have a good way to control block size, I am asking if spark can add rate limit on store(dataBuffer: ArrayBuffer[T]) method or have a way to control block size generated by BlockGenerator -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org