Michael,

Great! I actually ran into a very similar issue when working on something else 
a little while back, so I had a good idea of where to look :)

Thanks for verifying!

-Mark

----------------------------------------
> From: [email protected]
> To: [email protected]
> Subject: RE: NiFi-Spark receiver not serializable
> Date: Fri, 4 Sep 2015 17:16:07 +0000
>
> Hi Mark,
>
> I've just left a comment on the JIRA ticket, but yes that has solved the 
> issue, thanks! I tried making everything else serializable but missed that 
> one.
>
> Thanks again,
> Michael
>
> -----Original Message-----
> From: Mark Payne [mailto:[email protected]]
> Sent: 04 September 2015 17:24
> To: [email protected]
> Subject: RE: NiFi-Spark receiver not serializable
>
> Michael,
>
> I have submitted a patch for NIFI-927, if you are interested in giving that a 
> try.
>
> Thanks
> -Mark
>
> ----------------------------------------
>> From: [email protected]
>> To: [email protected]
>> Subject: RE: NiFi-Spark receiver not serializable
>> Date: Fri, 4 Sep 2015 11:19:48 -0500
>>
>>
>> Michael,
>>
>> It looks like it is actually not the Runnable that needs to be
>> receivable but rather the inner class of the Runnable (referenced as
>> Runnable$1 here). This is the implementation of the NiFiDataPacket 
>> interface. I have created a ticket to address this:
>>
>> https://issues.apache.org/jira/browse/NIFI-927
>>
>> Thanks
>> -Mark
>>
>> ________________________________
>>> From: [email protected]
>>> To: [email protected]
>>> Subject: NiFi-Spark receiver not serializable
>>> Date: Fri, 4 Sep 2015 16:02:10 +0000
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I keep having an odd error that happens frequently (but not always, a
>>> Spark job can run for ~10 minutes before this is thrown). The
>>> NiFiReceiver's Runnable is unable to be serialized due to it being
>>> modified at the same time as it is being serialized. The full stack
>>> trace is below. I've looked through the NiFiReceiver code, including
>>> taking a copy and making the Runnable extend serializable but that
>>> did not solve it either.
>>>
>>>
>>>
>>> Has anyone seen this before? To be frank, I'm unsure whether it's the
>>> receiver or my own code affecting the receiver.
>>>
>>>
>>>
>>> Many thanks,
>>>
>>>
>>>
>>> Michael
>>>
>>>
>>>
>>> 15/09/04 16:55:39 WARN scheduler.TaskSetManager: Lost task 0.0 in
>>> stage
>>> 48.0 (TID 411, slave6.localdomain): java.lang.RuntimeException:
>>> com.esotericsoftware.kryo.KryoException:
>>> java.util.ConcurrentModificationException
>>>
>>> Serialization trace:
>>>
>>> classes (sun.misc.Launcher$AppClassLoader)
>>>
>>> classloader (java.security.ProtectionDomain)
>>>
>>> cachedPDs (javax.security.auth.SubjectDomainCombiner)
>>>
>>> combiner (java.security.AccessControlContext)
>>>
>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>
>>> contextClassLoader
>>> (org.apache.spark.streaming.util.RecurringTimer$$anon$1)
>>>
>>> thread (org.apache.spark.streaming.util.RecurringTimer)
>>>
>>> blockIntervalTimer
>>> (org.apache.spark.streaming.receiver.BlockGenerator)
>>>
>>> blockGenerator
>>> (org.apache.spark.streaming.receiver.ReceiverSupervisorImpl)
>>>
>>> executor_ (org.apache.nifi.spark.NiFiReceiver)
>>>
>>> this$0 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable)
>>>
>>> this$1 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable$1)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:585)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ
>>> er.java:79)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ
>>> er.java:17)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:570)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>>> alizer.java:213)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> at
>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoS
>>> erializer.scala:148)
>>>
>>> at
>>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.s
>>> cala:153)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManage
>>> r.scala:1189)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scal
>>> a:1198)
>>>
>>> at
>>> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:190)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:4
>>> 80)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala
>>> :302)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N
>>> ettyBlockRpcServer.scala:57)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N
>>> ettyBlockRpcServer.scala:57)
>>>
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:244)
>>>
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:244)
>>>
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
>>> d.scala:33)
>>>
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>> at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>
>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlock
>>> RpcServer.scala:57)
>>>
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.processRpcReq
>>> uest(TransportRequestHandler.java:114)
>>>
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.handle(Transp
>>> ortRequestHandler.java:87)
>>>
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>>> TransportChannelHandler.java:101)
>>>
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>>> TransportChannelHandler.java:51)
>>>
>>> at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne
>>> lInboundHandler.java:105)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle
>>> r.java:254)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM
>>> essageDecoder.java:103)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage
>>> Decoder.java:163)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne
>>> lPipeline.java:787)
>>>
>>> at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra
>>> ctNioByteChannel.java:130)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav
>>> a:511)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve
>>> ntLoop.java:468)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja
>>> va:382)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread
>>> EventExecutor.java:116)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.util.ConcurrentModificationException
>>>
>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
>>>
>>> at java.util.Vector$Itr.next(Vector.java:1137)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll
>>> ectionSerializer.java:67)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll
>>> ectionSerializer.java:18)
>>>
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>>> te(FieldSerializer.java:564)
>>>
>>> ... 78 more
>>>
>>>
>>>
>>> at
>>> org.apache.spark.network.client.TransportResponseHandler.handle(Trans
>>> portResponseHandler.java:162)
>>>
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>>> TransportChannelHandler.java:103)
>>>
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>>> TransportChannelHandler.java:51)
>>>
>>> at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne
>>> lInboundHandler.java:105)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle
>>> r.java:254)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM
>>> essageDecoder.java:103)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage
>>> Decoder.java:163)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>>> ractChannelHandlerContext.java:333)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>>> ctChannelHandlerContext.java:319)
>>>
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne
>>> lPipeline.java:787)
>>>
>>> at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra
>>> ctNioByteChannel.java:130)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav
>>> a:511)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve
>>> ntLoop.java:468)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja
>>> va:382)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread
>>> EventExecutor.java:116)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Michael Griffiths
>>> NationProtect Developer
>>> BAE Systems Applied Intelligence
>>> ___________________________________________________________
>>>
>>> E: [email protected]
>>>
>>> BAE Systems Applied Intelligence, Surrey Research Park, Guildford,
>>> Surrey, GU2 7RQ.
>>> www.baesystems.com/ai<http://www.baesystems.com/ai>
>>>
>>>
>>>
>>> Please consider the environment before printing this email. This
>>> message should be regarded as confidential. If you have received this
>>> email in error please notify the sender and destroy it immediately.
>>> Statements of intent shall only become binding when confirmed in hard
>>> copy by an authorised signatory. The contents of this email may
>>> relate to dealings with other companies under the control of BAE
>>> Systems Applied Intelligence Limited, details of which can be found
>>> at http://www.baesystems.com/Businesses/index.htm.
>>
>
> Please consider the environment before printing this email. This message 
> should be regarded as confidential. If you have received this email in error 
> please notify the sender and destroy it immediately. Statements of intent 
> shall only become binding when confirmed in hard copy by an authorised 
> signatory. The contents of this email may relate to dealings with other 
> companies under the control of BAE Systems Applied Intelligence Limited, 
> details of which can be found at 
> http://www.baesystems.com/Businesses/index.htm.
                                          

Reply via email to