Could you make sure that the instance of com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed record, is serializable?
> On 1 Dec 2020, at 06:06, Ajit Dongre <[email protected]> wrote: > > Hi all, > > I have pipeline to read data from kafka & write to file. I am using Beam > 2.12 with spark runner in java. While executing I am getting below exception > : > > org.apache.spark.network.client.ChunkFetchFailureException: Failure while > fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: > java.io.NotSerializableException: > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow > Serialization stack: > - object not serializable (class: > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: > TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, > timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, > timing=EARLY, index=0}}) > - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) > - object (class scala.Tuple2, > (Tag<com.walmart.dataplatform.aorta.river.fn.FilterPCollection.<init>:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, > timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, > timing=EARLY, index=0}})) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) > at > org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608) > at > org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) > at > org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583) > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at > scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) > at > org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) > at > org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) at > org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > > > com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85 > <mailto:com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85> is > serializable and also I observed that this exception is not coming for all > records. > > Please suggest solution to this problem. > > Regards, > Ajit Dongre
