You need to put the annotation on the class that wraps the GenericRecord, not the operator.
I don't know whether GenericRecord is suitable for Java serialization. If it is, you can try to change your operator to emit KryoJdkContainer instead (or change the stream codec): GenericRecord record = .. port.emit(new KryoJdkContainer<>(record)); On Mon, Aug 22, 2016 at 3:35 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < suryavamshivardhan.mukkam...@rbc.com> wrote: > Hi , > > > > I am trying to use java serialization as per below but launch is failing > with below exception. I am using the below annotation on the operator > class. Any thoughts? > > > > @DefaultSerializer(JavaSerializer.class) > > public class AvroFileOutputOperator extends > GroupFileOutputOperator<GenericRecord> > { > > > > //some code > > > > > > ############### Exception ########################3 > > > > An error occurred trying to launch the application. Server message: > com.esotericsoftware.kryo.KryoException: Error during Java serialization. > at > com.esotericsoftware.kryo.serializers.JavaSerializer.write(JavaSerializer.java:33) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at > com.datatorrent.common.util.FSStorageAgent.store(FSStorageAgent.java:183) > at com.datatorrent.stram.plan.logical.LogicalPlan$ > OperatorMeta.writeObject(LogicalPlan.java:805) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.HashMap.writeObject(HashMap.java:1129) at sun.reflect. > GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect. > DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at com.datatorrent.stram.plan.logical.LogicalPlan$ > OperatorMeta.writeObject(LogicalPlan.java:804) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.HashMap.writeObject(HashMap.java:1129) at sun.reflect. > GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect. > DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at com.datatorrent.stram.plan.logical.LogicalPlan$ > OperatorMeta.writeObject(LogicalPlan.java:804) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.HashMap.writeObject(HashMap.java:1129) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at com.datatorrent.stram.plan.logical.LogicalPlan$ > OperatorMeta.writeObject(LogicalPlan.java:804) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > java.util.HashMap.writeObject(HashMap.java:1129) at sun.reflect. > NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect. > NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) at > java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > com.datatorrent.stram.plan.logical.LogicalPlan.write(LogicalPlan.java:2068) > at com.datatorrent.stram.StramClient.startApplication(StramClient.java:518) > at > com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:529) > at com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:2050) > at com.datatorrent.stram.cli.DTCli.launchAppPackage(DTCli.java:3456) at > com.datatorrent.stram.cli.DTCli.access$7100(DTCli.java:106) at > com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:1895) at > com.datatorrent.stram.cli.DTCli$3.run(DTCli.java:1449) Caused by: > java.io.NotSerializableException: > com.rbc.aml.silver.operator.AvroFileOutputOperator at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at > com.esotericsoftware.kryo.serializers.JavaSerializer.write(JavaSerializer.java:30) > ... 149 more > > > > > > > > Regards, > > Surya Vamshi > > > > *From:* Thomas Weise [mailto:thomas.we...@gmail.com] > *Sent:* 2016, August, 22 3:09 PM > > *To:* users@apex.apache.org > *Subject:* Re: kryo Serealization Exception > > > > There is some information available here: > > > > http://docs.datatorrent.com/troubleshooting/#application- > throwing-following-kryo-exception > > > > If the object is Java serializable, you can set the stream codec or wrap > into KryoJdkContainer: > > > > https://github.com/apache/apex-malhar/tree/master/ > library/src/main/java/com/datatorrent/lib/codec > > > > > > > > > > > > On Mon, Aug 22, 2016 at 11:42 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi Tushar, > > In Our case, Generic Record fields are generated at run time from > database. I cannot convert into a predefined POJO to pass through output > port. > Is it mandatory that Generic Record class must have no-arg constructor for > kryo serialization ? > > Regards, > Surya Vamshi > -----Original Message----- > From: Tushar Gosavi [mailto:tus...@datatorrent.com] > Sent: 2016, August, 20 2:33 AM > To: users@apex.apache.org > Subject: Re: kryo Serealization Exception > > Hi > > Another option is to create your own Java object and populate the fields > you need for further processing from GenericRecord, and send it on the > output port. You can use this approach if you can not put the operators in > single container, because 1) you need to shuffle based on key or 2) > resource constraints. > > -Tushar. > > > On Sat, Aug 20, 2016 at 3:23 AM, Devendra Tagare < > devend...@datatorrent.com> wrote: > > Hi, > > > > You can set the Locality of the parser and the writer to Container local. > > > > This will ensure that Generic Record from the parser does not get > > serialized between containers. > > > > Thanks, > > Dev > > > > On Fri, Aug 19, 2016 at 2:21 PM, Mukkamula, Suryavamshivardhan > > (CWM-NR) <suryavamshivardhan.mukkam...@rbc.com> wrote: > >> > >> Hi, > >> > >> Can you please help resolve the below issue? > >> > >> In our project we are using ‘org.apache.avro.generic.GenericRecord’ > >> as Tuple writing to a parquet file and we are using avro schema for > >> each record. We are getting the below exception, I suppose > >> GenericRecord does not have no-arg constructor, and looking for some > ideas to solve this problem. > >> > >> ######### Exception ################################## > >> > >> 2016-08-19 16:29:12,845 [5/silverFileOut:AvroFileOutputOperator] > >> ERROR codec.Def aultStatefulStreamCodec fromDataStatePair - > >> Catastrophic Error: Execution halted due to Kryo exception! > >> com.esotericsoftware.kryo.KryoException: Class cannot be created > >> (missing no-arg > >> constructor): org.apache.avro.generic.GenericData$Record > >> at > >> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstant > >> iatorOf(Kryo.java:1228) > >> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo. > java:1049) > >> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > >> at > >> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSer > >> ializer.java:547) > >> at > >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria > >> lizer.java:523) > >> at > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> com.datatorrent.stram.codec.DefaultStatefulStreamCodec.fromDataStateP > >> air(DefaultStatefulStreamCodec.java:99) > >> at > >> com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.p > >> rocessPayload(BufferServerSubscriber.java:364) > >> at > >> com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.s > >> weep(BufferServerSubscriber.java:316) > >> at > >> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:252) > >> at > >> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContai > >> ner.java:1382) > >> 2016-08-19 16:30:09,336 [main] INFO stram.StreamingContainerManager > >> updateCheck > >> > >> Regards, > >> Surya Vamshi > >> > >> > >> _____________________________________________________________________ > >> __ > >> > >> If you received this email in error, please advise the sender (by > >> return email or otherwise) immediately. You have consented to receive > >> the attached electronically at the above-noted email address; please > >> retain a copy of this confirmation for future reference. > >> > >> Si vous recevez ce courriel par erreur, veuillez en aviser > >> l'expéditeur immédiatement, par retour de courriel ou par un autre > >> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) > >> par voie électronique à l'adresse courriel indiquée ci-dessus; > >> veuillez conserver une copie de cette confirmation pour les fins de > reference future. > > > > > > _______________________________________________________________________ > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > >