Yes Alexey, com.walmart.dataplatform.aorta.river.meta.Payload class is 
serializable.

Strange observation is overall spark job is successful but in executor logs we 
continuously observed this exception.  As code is same across all nodes, how 
same code is working fine in one node and failing on another node with 
NotSerializableException.

Regards,
Ajit Dongre

From: Alexey Romanenko <[email protected]>
Reply to: "[email protected]" <[email protected]>
Date: Tuesday, 1 December 2020 at 11:35 PM
To: "[email protected]" <[email protected]>
Subject: EXT: Re: NotSerializableException in Spark runner

Could you make sure that the instance of 
com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed 
record, is serializable?


On 1 Dec 2020, at 06:06, Ajit Dongre 
<[email protected]<mailto:[email protected]>> wrote:

Hi all,

I have pipeline to read data from kafka & write to file.  I am using Beam 2.12 
with spark runner in java.  While executing I am getting below exception :

org.apache.spark.network.client.ChunkFetchFailureException: Failure while 
fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: 
java.io.NotSerializableException: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
        - object not serializable (class: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: 
TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}})
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, 
(Tag<com.walmart.dataplatform.aorta.river.fn.FilterPCollection.<init>:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}}))
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
        at 
org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
        at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
        at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
        at 
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
        at 
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)


com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85<mailto:com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85>
 is serializable and also I observed that this exception is not coming for all 
records.

Please suggest solution to this problem.

Regards,
Ajit Dongre


Reply via email to