Yes Alexey, com.walmart.dataplatform.aorta.river.meta.Payload class is serializable.
Strange observation is overall spark job is successful but in executor logs we continuously observed this exception. As code is same across all nodes, how same code is working fine in one node and failing on another node with NotSerializableException. Regards, Ajit Dongre From: Alexey Romanenko <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Tuesday, 1 December 2020 at 11:35 PM To: "[email protected]" <[email protected]> Subject: EXT: Re: NotSerializableException in Spark runner Could you make sure that the instance of com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed record, is serializable? On 1 Dec 2020, at 06:06, Ajit Dongre <[email protected]<mailto:[email protected]>> wrote: Hi all, I have pipeline to read data from kafka & write to file. I am using Beam 2.12 with spark runner in java. While executing I am getting below exception : org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow Serialization stack: - object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (Tag<com.walmart.dataplatform.aorta.river.fn.FilterPCollection.<init>:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}})) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85<mailto:com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85> is serializable and also I observed that this exception is not coming for all records. Please suggest solution to this problem. Regards, Ajit Dongre
