Michael, Great! I actually ran into a very similar issue when working on something else a little while back, so I had a good idea of where to look :)
Thanks for verifying! -Mark ---------------------------------------- > From: [email protected] > To: [email protected] > Subject: RE: NiFi-Spark receiver not serializable > Date: Fri, 4 Sep 2015 17:16:07 +0000 > > Hi Mark, > > I've just left a comment on the JIRA ticket, but yes that has solved the > issue, thanks! I tried making everything else serializable but missed that > one. > > Thanks again, > Michael > > -----Original Message----- > From: Mark Payne [mailto:[email protected]] > Sent: 04 September 2015 17:24 > To: [email protected] > Subject: RE: NiFi-Spark receiver not serializable > > Michael, > > I have submitted a patch for NIFI-927, if you are interested in giving that a > try. > > Thanks > -Mark > > ---------------------------------------- >> From: [email protected] >> To: [email protected] >> Subject: RE: NiFi-Spark receiver not serializable >> Date: Fri, 4 Sep 2015 11:19:48 -0500 >> >> >> Michael, >> >> It looks like it is actually not the Runnable that needs to be >> receivable but rather the inner class of the Runnable (referenced as >> Runnable$1 here). This is the implementation of the NiFiDataPacket >> interface. I have created a ticket to address this: >> >> https://issues.apache.org/jira/browse/NIFI-927 >> >> Thanks >> -Mark >> >> ________________________________ >>> From: [email protected] >>> To: [email protected] >>> Subject: NiFi-Spark receiver not serializable >>> Date: Fri, 4 Sep 2015 16:02:10 +0000 >>> >>> >>> Hi all, >>> >>> >>> >>> I keep having an odd error that happens frequently (but not always, a >>> Spark job can run for ~10 minutes before this is thrown). The >>> NiFiReceiver's Runnable is unable to be serialized due to it being >>> modified at the same time as it is being serialized. The full stack >>> trace is below. I've looked through the NiFiReceiver code, including >>> taking a copy and making the Runnable extend serializable but that >>> did not solve it either. >>> >>> >>> >>> Has anyone seen this before? To be frank, I'm unsure whether it's the >>> receiver or my own code affecting the receiver. >>> >>> >>> >>> Many thanks, >>> >>> >>> >>> Michael >>> >>> >>> >>> 15/09/04 16:55:39 WARN scheduler.TaskSetManager: Lost task 0.0 in >>> stage >>> 48.0 (TID 411, slave6.localdomain): java.lang.RuntimeException: >>> com.esotericsoftware.kryo.KryoException: >>> java.util.ConcurrentModificationException >>> >>> Serialization trace: >>> >>> classes (sun.misc.Launcher$AppClassLoader) >>> >>> classloader (java.security.ProtectionDomain) >>> >>> cachedPDs (javax.security.auth.SubjectDomainCombiner) >>> >>> combiner (java.security.AccessControlContext) >>> >>> acc (org.apache.spark.util.MutableURLClassLoader) >>> >>> contextClassLoader >>> (org.apache.spark.streaming.util.RecurringTimer$$anon$1) >>> >>> thread (org.apache.spark.streaming.util.RecurringTimer) >>> >>> blockIntervalTimer >>> (org.apache.spark.streaming.receiver.BlockGenerator) >>> >>> blockGenerator >>> (org.apache.spark.streaming.receiver.ReceiverSupervisorImpl) >>> >>> executor_ (org.apache.nifi.spark.NiFiReceiver) >>> >>> this$0 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable) >>> >>> this$1 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable$1) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:585) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>> >>> at >>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ >>> er.java:79) >>> >>> at >>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ >>> er.java:17) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:570) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri >>> alizer.java:213) >>> >>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>> >>> at >>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoS >>> erializer.scala:148) >>> >>> at >>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.s >>> cala:153) >>> >>> at >>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManage >>> r.scala:1189) >>> >>> at >>> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scal >>> a:1198) >>> >>> at >>> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:190) >>> >>> at >>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:4 >>> 80) >>> >>> at >>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala >>> :302) >>> >>> at >>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N >>> ettyBlockRpcServer.scala:57) >>> >>> at >>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N >>> ettyBlockRpcServer.scala:57) >>> >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike >>> .scala:244) >>> >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike >>> .scala:244) >>> >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize >>> d.scala:33) >>> >>> at >>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> >>> at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> >>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>> >>> at >>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlock >>> RpcServer.scala:57) >>> >>> at >>> org.apache.spark.network.server.TransportRequestHandler.processRpcReq >>> uest(TransportRequestHandler.java:114) >>> >>> at >>> org.apache.spark.network.server.TransportRequestHandler.handle(Transp >>> ortRequestHandler.java:87) >>> >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0( >>> TransportChannelHandler.java:101) >>> >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0( >>> TransportChannelHandler.java:51) >>> >>> at >>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne >>> lInboundHandler.java:105) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle >>> r.java:254) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM >>> essageDecoder.java:103) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage >>> Decoder.java:163) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne >>> lPipeline.java:787) >>> >>> at >>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra >>> ctNioByteChannel.java:130) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav >>> a:511) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve >>> ntLoop.java:468) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja >>> va:382) >>> >>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>> >>> at >>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread >>> EventExecutor.java:116) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.util.ConcurrentModificationException >>> >>> at java.util.Vector$Itr.checkForComodification(Vector.java:1184) >>> >>> at java.util.Vector$Itr.next(Vector.java:1137) >>> >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll >>> ectionSerializer.java:67) >>> >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll >>> ectionSerializer.java:18) >>> >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri >>> te(FieldSerializer.java:564) >>> >>> ... 78 more >>> >>> >>> >>> at >>> org.apache.spark.network.client.TransportResponseHandler.handle(Trans >>> portResponseHandler.java:162) >>> >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0( >>> TransportChannelHandler.java:103) >>> >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0( >>> TransportChannelHandler.java:51) >>> >>> at >>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne >>> lInboundHandler.java:105) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle >>> r.java:254) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM >>> essageDecoder.java:103) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage >>> Decoder.java:163) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst >>> ractChannelHandlerContext.java:333) >>> >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra >>> ctChannelHandlerContext.java:319) >>> >>> at >>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne >>> lPipeline.java:787) >>> >>> at >>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra >>> ctNioByteChannel.java:130) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav >>> a:511) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve >>> ntLoop.java:468) >>> >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja >>> va:382) >>> >>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>> >>> at >>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread >>> EventExecutor.java:116) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> >>> >>> >>> >>> Michael Griffiths >>> NationProtect Developer >>> BAE Systems Applied Intelligence >>> ___________________________________________________________ >>> >>> E: [email protected] >>> >>> BAE Systems Applied Intelligence, Surrey Research Park, Guildford, >>> Surrey, GU2 7RQ. >>> www.baesystems.com/ai<http://www.baesystems.com/ai> >>> >>> >>> >>> Please consider the environment before printing this email. This >>> message should be regarded as confidential. If you have received this >>> email in error please notify the sender and destroy it immediately. >>> Statements of intent shall only become binding when confirmed in hard >>> copy by an authorised signatory. The contents of this email may >>> relate to dealings with other companies under the control of BAE >>> Systems Applied Intelligence Limited, details of which can be found >>> at http://www.baesystems.com/Businesses/index.htm. >> > > Please consider the environment before printing this email. This message > should be regarded as confidential. If you have received this email in error > please notify the sender and destroy it immediately. Statements of intent > shall only become binding when confirmed in hard copy by an authorised > signatory. The contents of this email may relate to dealings with other > companies under the control of BAE Systems Applied Intelligence Limited, > details of which can be found at > http://www.baesystems.com/Businesses/index.htm.
