AvroToPojo has a bug, transient modifier needs to be added to 2 fields. private transient List<FieldInfo> fieldInfos; private transient List<ActiveFieldInfo> columnFieldSetters;
Also there is one more bug in Avro input operator, there is a PR open for that. Fix is to add the below line to beginWindow call in the operator. super.beginWindow(windowId); In both the cases, you can copy the operator code to your repo, and make the changes mentioned. Thanks On Tue, May 30, 2017 at 6:42 PM Vivek Bhide <bhide.vi...@gmail.com> wrote: > I am using the AvroToPojo Malhar operator in conjunction with > AvroFileInputOperator for converting the avro records to POJO. While doing > the testing for application's stability, I found that AvroToPojo opwerator > doesn't recover in case of failure and keeps throwing below exception. This > in turn makes the whole application unstable and hence to be killed > > The field for which it throws error 'ActiveFieldInfo' is a static inner > class and I am not sure on what can be done to have the operator recover > itself without any trouble. > > Any pointers on this issue will be really helpful > > 2017-05-30 17:15:46,826 INFO stram.StreamingContainerParent > (StreamingContainerParent.java:log(170)) - child msg: deploy request > failed: > > [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3, > 0, > > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer= > brdn2244.target.com]]], > > OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3, > 0, > > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]] > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing > no-arg constructor): > com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo > Serialization trace: > columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo) > at > > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > > com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200) > at > com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139) > at > > com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935) > at > > com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883) > at > > com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827) > at > > com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708) > at > > com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313) > context: > > PTContainer[id=1(container_e21_1491404336779_1770158_01_000018),state=ACTIVE,operators=[PTOperator[id=2,name=fileReader$avroToPojo,state=PENDING_DEPLOY], > PTOperator[id=1,name=fileReader$fileReader,state=PENDING_DEPLOY]]] > 2017-05-30 17:15:46,832 INFO stram.StreamingContainerParent > (StreamingContainerParent.java:log(170)) - child msg: > java.lang.IllegalStateException: Deploy request failed: > > [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3, > 0, > > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer= > brdn2244.target.com]]], > > OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3, > 0, > > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]] > at > > com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:836) > at > > com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708) > at > > com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313) > Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created > (missing no-arg constructor): > com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo > Serialization trace: > columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo) > at > > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > > com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200) > at > com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139) > at > > com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935) > at > > com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883) > at > > com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827) > ... 2 more > > > Regards > Vivek > > > > -- > View this message in context: > http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660.html > Sent from the Apache Apex Users list mailing list archive at Nabble.com. >