Flink should not interact poorly with heavily nested schemas. So this
might be another bug that is worth investigating. Can you share an
example that reproduces your issues with us? Which Flink version are you
using?
Contributors are always welcome :) I will also take a look into the
serialization issue otherwise.
Regards,
Timo
Am 15.05.18 um 17:33 schrieb Padarn Wilson:
> usually people are using the AvroInputFormat with the Avro class
generated by an Avro schema
This is actually what I was doing.. but it seems to interact poorly
with heavily nested schemas. My schema has a field which is a
`List[SubSchema]`, where SubSchema is another avro schema.
> What do you mean with "this is an issue that is mentioned in the
documentation" where is this issue documented?
I was referring to this:
https://flink.apache.org/faq.html#i-have-a-notserializableexception
> So your exception seems to be a bug if it works locally but not
distributed.
Hmm, well its nice to know I'm not just doing something stupid :-)
Perhaps I'll try compile flink myself so I can try and debug this.
On Tue, May 15, 2018 at 8:54 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Padarn,
usually people are using the AvroInputFormat with the Avro class
generated by an Avro schema. But after looking into the
implementation, one should also be able to use the GenericRecord
class as a parameter. So your exception seems to be a bug if it
works locally but not distributed. What do you mean with "this is
an issue that is mentioned in the documentation" where is this
issue documented?
Regards,
Timo
Am 14.05.18 um 18:53 schrieb Padarn Wilson:
Hi all - sorry this seems like a silly question, but I can't
figure it out.
I'm using an AvroInputFormat in order to read an Avro file like this:
val textInputFormat =new
AvroInputFormat[GenericRecord](infile,classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)
This works fine in local mode, but when submitted to a flink
cluster I get serialisation errors that look like this:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
... 7 more
Caused by: java.lang.IllegalAccessException: Class
com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class
org.apache.avro.Schema$StringSchema with modifiers "public"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
at
java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
at
java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
at
com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
at
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
... 37 more
I realise this is an issue that is mentioned in the
documentation, but given that it looks like it is a problem with
some class insider the AvroInputFormat that is having trouble
being serialised, I'm not sure on what he best solution is.
This works fine if I specify the class not to be generic - i.e
val textInputFormat =new
AvroInputFormat[GenericRecord](infile,classOf[Example])
val lines = env.readFile(textInputFormat, path
However I can't get this to run in local mode with a case class
`Example` that is nested, which is required as the Avro files
have very nested fields.