PySpark Streaming “PicklingError: Could not serialize object” when use transform operator and checkpoint enabled

2019-05-23 Thread Xilang Yan
In PySpark streaming, if checkpoint enabled, and if use a stream.transform
operator to join with another rdd, “PicklingError: Could not serialize
object” will be thrown. I have asked the same question at stackoverflow:
https://stackoverflow.com/questions/56267591/pyspark-streaming-picklingerror-could-not-serialize-object-when-checkpoint-an

After some investigation, I found the problem is due to checkpoint will
serialize lambda and then serialize the rdd in lambda. So I change the code
to something like below, the purpose is to use a static transient variable 
to avoid serialize rdd.

class DocInfoHolder:
doc_info = None

line.transform(lambda rdd:rdd.join(DocInfoHolder.doc_info)).pprint(10)

But problem exist still. Then I found pyspark use a special pickle called
cloudpickle.py, looks like it will serialize any reference class, function,
lambda code, and there is no document about how to skip serialize. Could
anyone help, how to walk around this issue.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL met "Block broadcast_xxx not found"

2019-05-07 Thread Xilang Yan
Ok... I am sure it is a bug of spark, I found the bug code, but the code is
removed in 2.2.3, so I just upgrade spark to fix the problem.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SQL met "Block broadcast_xxx not found"

2019-04-27 Thread Xilang Yan
We met broadcast issue in some of our applications, but not every time we run
application, usually it gone when we rerun it. In the exception log, I see
below two types of exception:

Exception 1:
10:09:20.295 [shuffle-server-6-2] ERROR
org.apache.spark.network.server.TransportRequestHandler - Error opening
block StreamChunkId{streamId=365584526097, chunkIndex=0} for request from
/10.33.46.33:19866
org.apache.spark.storage.BlockNotFoundException: Block broadcast_334_piece0
not found
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:361)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at scala.collection.Iterator$$anon$11.next(Iterator.scala:363)
~[scala-library-2.11.0.jar:?]
at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
~[scala-library-2.11.0.jar:?]
at
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:87)
~[spark-network-common_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125)
[spark-network-common_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
[spark-network-common_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
[spark-network-common_2.11-2.2.1.jar:2.2.1]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
[netty-all-4.0.23.Final.jar:4.0.23.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
[netty-all-4.0.23.Final.jar:4.0.23.Final]


Exception 2:
10:14:37.906 [Executor task launch worker for task 430478] ERROR
org.apache.spark.util.Utils - Exception encountered
org.apache.spark.SparkException: Failed to get broadcast_696_piece0 of
broadcast_696
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:178)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:150)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:150)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at scala.collection.immutable.List.foreach(List.scala:383)
~[scala-library-2.11.0.jar:?]
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:150)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:222)
~[spark-core_2.11-2.2.1.jar:2.2.1]
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
[spark-core_2.11-2.2.1.jar:2.2.1]
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
[spark-core_2.11-2.2.1.jar:2.2.1]
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
[spark-core_2.11-2.2.1.jar:2.2.1]


I think exception 2 is caused by exception 1, so the issue is when executor
A try to get broadcast from executor B, executor B cannot find in local. It
is strange, because broadcast is store in memory and disk, it should not be
removed only when driver asked, but driver will remove broadcast only one
broadcast variable not used anymore.

Could anyone gives some cue on how to find the root cause of this issue?
Thanks a lot!






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Limit the block size of data received by spring streaming receiver

2018-01-07 Thread Xilang Yan
Hey,

We use a customize receiver to receive data from our MQ. We used to use def
store(dataItem: T) to store data however I found the block size can be very
different from 0.5K to 5M size. So that data partition processing time is
very different. Shuffle is an option, but I want to avoid it.

I notice that def store(dataBuffer: ArrayBuffer[T]) can store the whole data
into a block so I can control block size, however I also noticed that this
method doesn't apply any rate limit on it, I have to do rate limit myself.

So by now, I haven't have a good way to control block size, I am asking if
spark can add rate limit on  store(dataBuffer: ArrayBuffer[T]) method or
have a way to control block size generated by BlockGenerator




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org