Sorry forget my last mail, that was half-finished. Here is the real one:
Hi Lorenzo, if you still have time to investigate. Your stack trace shows that all expected code paths have been taken. Conversions are there; although they look different than here, but that can be attributed to the avro upgrade. Could you put a breakpoint on SpecificDatumReader.readField, so that you can inspect the conversion for the timestamp field? You probably want to make it a conditional for f.name() == <your field name>. The expected flow is that it should have a conversion that returns the joda time instead of the long. Then datum should be the converted joda time. @Override protected void readField(Object r, Schema.Field f, Object oldDatum, ResolvingDecoder in, Object state) throws IOException { if (r instanceof SpecificRecordBase) { Conversion<?> conversion = ((SpecificRecordBase) r).getConversion(f.pos()); Object datum; if (conversion != null) { datum = readWithConversion( oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); } else { datum = readWithoutConversion(oldDatum, f.schema(), in); } getData().setField(r, f.name(), f.pos(), datum); } else { super.readField(r, f, oldDatum, in, state); } } On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Lorenzo, > > if you still have time to investigate. > > Your stack trace shows that all expected code paths have been taken. > Conversions are there although they look different than here, but that can > be attributed to the avro upgrade. > > @Override > protected void readField(Object r, Schema.Field f, Object oldDatum, > ResolvingDecoder in, Object state) > throws IOException { > if (r instanceof SpecificRecordBase) { > Conversion<?> conversion = ((SpecificRecordBase) > r).getConversion(f.pos()); > > Object datum; > if (conversion != null) { > datum = readWithConversion( > oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); > } else { > datum = readWithoutConversion(oldDatum, f.schema(), in); > } > > getData().setField(r, f.name(), f.pos(), datum); > > } else { > super.readField(r, f, oldDatum, in, state); > } > } > > > On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > >> >> Thanks Gouwei, >> >> setting format.setReuseAvroValue(false) with 1.8.2-generated records >> does not solve the problem. >> >> 12:02:59,314 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937. >> java.lang.ClassCastException: java.lang.Long cannot be cast to >> org.joda.time.DateTime >> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >> at >> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >> >> >> ---------------------------- >> >> Summarising, the only working combination seems to be: >> >> - Use AVRO 1.9.2 code generation, setting >> dateTimeLogicalTypeImplementation >> = joda >> - Enabling Object Reuse (being careful for the implications) >> >> Using AVRO 1.8.2 code generation does not work, with any of the other >> workarounds. >> Using Generic objects does not work for a bug in AvroSerializer >> <https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords >> also brings a number of other problems. >> >> I am not very comfortable with using AVRO objects generated with a >> different AVRO version than the one supported by Flink. >> I am going to map AVRO records into hand-written POJOs immediately after >> the ingestion to reduce chances of further issues. I reckon this is very >> empirical, but that's what the workaround looks to me :) >> >> Lorenzo >> >> P.S, I want to give a massive thank to this community. So far it has been >> one of the most reactive and helpful I ever interacted with. >> >> On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, >>> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false); >>> >>> Best, >>> Guowei >>> >>> >>> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道: >>> >>>> Hi Arvid, >>>> >>>> thanks for the point about catching records. Gotcha! >>>> >>>> Sorry I cannot share the full schema or generated code. It's a 3rd >>>> party IP and we signed a meter-think NDA... I think I can post snippets. >>>> The schema is heavily nested, including arrays of other record types >>>> Types are primitives, or logical decimal and timestamp-millis. No union. >>>> >>>> #conversion is in AccountEntries only (one of the nested records) and >>>> looks like this: >>>> >>>> private static final org.apache.avro.Conversion<?>[] conversions = >>>> new org.apache.avro.Conversion<?>[] { >>>> null, >>>> null, >>>> null, >>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>> null, >>>> null, >>>> null, >>>> null, >>>> null, >>>> null, >>>> null >>>> }; >>>> >>>> >>>> Note that I have to generate the specific object with AVRO 1.9.2 Maven >>>> Plugin. >>>> With 1.8.2 generated code it fails with the following exception, >>>> regardless setting enableObjectReuse() >>>> >>>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>>> org.joda.time.DateTime >>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>>> at >>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> at >>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>>> at >>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>>> at >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>>> >>>> >>>> Thanks for the help >>>> Lorenzo >>>> >>>> >>>> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote: >>>> >>>>> Hi Lorenzo, >>>>> >>>>> I'm glad that it worked out somehow, but I'd still like to understand >>>>> what went wrong, so it will work more smoothly for future users. I double >>>>> checked and we even test AvroSerializer with logical types, so I'm a bit >>>>> puzzled. >>>>> >>>>> Could you attach GlHeader or at least show us how GlHeader#conversions >>>>> look like? I want to exclude the possibility that the source generator >>>>> screwed up. >>>>> >>>>> Concerning object reuse is that you need to treat all POJO as >>>>> immutable (I'm assuming that that's what your meant from your >>>>> description), >>>>> but you should also never cache values like >>>>> class ShiftElements extends MapFunction { >>>>> Object lastElement; >>>>> >>>>> Object map(Object newElement, Collector out) { >>>>> out.collect(lastElement); >>>>> lastElement = newElement; // <- never cache with enableObjectReuse >>>>> } >>>>> } >>>>> >>>>> (excuse my ugly code) >>>>> >>>>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora < >>>>> lorenzo.nic...@gmail.com> wrote: >>>>> >>>>>> Hi Arvid, >>>>>> >>>>>> answering to your other questions >>>>>> >>>>>> Here is the stacktrace of the case (1), when I try to read using >>>>>> specific records generated by the AVRO 1.8.2 plugin >>>>>> >>>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>> org.joda.time.DateTime >>>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>>>>> at >>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>> at >>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>> at >>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>>>>> at >>>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>>>>> >>>>>> >>>>>> I also tried generating the specific object with avro 1.9.2 (2) but >>>>>> forcing it to use Joda time but still didn't work >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>>> Could not forward element to next operator >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z >>>>>> at >>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>>> at >>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>>> ... 7 more >>>>>> >>>>>> >>>>>> But in the second case, it seems the failure happens when Flink tries >>>>>> to make a copy of the record. >>>>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!* >>>>>> >>>>>> I am not sure I understand all implications of object reuse in Flink, >>>>>> specifically. >>>>>> I am familiar with the general risk of mutable messages, and I always >>>>>> handle them as mutable even when they are POJO. Never mutating and >>>>>> forwarding the same record. >>>>>> Not sure whether there are other implications in Flink. >>>>>> >>>>>> Many thanks >>>>>> Lorenzo >>>>>> >>>>>> >>>>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Lorenzo, >>>>>>> >>>>>>> 1) I'm surprised that this doesn't work. I'd like to see that >>>>>>> stacktrace. >>>>>>> >>>>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could >>>>>>> retest with dateTimeLogicalType='Joda' set, but then you will >>>>>>> probably see the same issue as 1) >>>>>>> >>>>>>> 3) I'm surprised that this doesn't work either. There is a codepath >>>>>>> since 2016 for GenericRecord and it's covered in a test. From the error >>>>>>> description and the ticket, it looks like the issue is not the >>>>>>> AvroInputFormat, but the serializer. So it would probably work with >>>>>>> a different serializer (but that would cause back and forth type >>>>>>> transformation). >>>>>>> >>>>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora < >>>>>>> lorenzo.nic...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks Timo, >>>>>>>> >>>>>>>> the stacktrace with 1.9.2-generated specific file is the following >>>>>>>> >>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>>>>> Could not forward element to next operator >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>>>> java.time.Instant: 2020-06-01T02:00:42.105Z >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>>>>> at >>>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>>> at >>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>> at >>>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>>>>> ... 7 more >>>>>>>> >>>>>>>> >>>>>>>> I reckon logical types might have been considered somehow >>>>>>>> experimental since...ever. But, honestly, I've been using them in the >>>>>>>> Kafka/Java ecosystem as well as in Spark without too many problems. >>>>>>>> >>>>>>>> For my specific use case, the schema is given. Messages are >>>>>>>> produced by a 3rd party and we cannot change the schema (especially >>>>>>>> because >>>>>>>> it's a legit schema). >>>>>>>> I am desperately looking for a workaround. >>>>>>>> >>>>>>>> I had a similar issue with a Kafka Source, and AVRO records >>>>>>>> containing decimals and timestamps. Timestamps worked but not decimals. >>>>>>>> I was able to work around the problem using GenericRecords. >>>>>>>> But Kafka source relies on AvroDeserializationSchema rather than >>>>>>>> AvroSerializer, and has no problem handling GenericRecords. >>>>>>>> >>>>>>>> I'm honestly finding very confusing having different ways of >>>>>>>> handling AVRO deserialization inside Flink core components. >>>>>>>> >>>>>>>> Cheers >>>>>>>> Lorenzo >>>>>>>> >>>>>>>> >>>>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Lorenzo, >>>>>>>>> >>>>>>>>> as far as I know we don't support Avro's logical times in Flink's >>>>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] >>>>>>>>> supports the >>>>>>>>> 1.8.2 version of logical types but might be incompatible with >>>>>>>>> 1.9.2. >>>>>>>>> >>>>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>>>>>>> >>>>>>>>> Could you send us the full stack trace? I think this should >>>>>>>>> actually >>>>>>>>> work, because specific records are handled as POJOs and those >>>>>>>>> should be >>>>>>>>> able to also deal with logical type's classes through Kryo. >>>>>>>>> >>>>>>>>> Reg 3) Generic record >>>>>>>>> >>>>>>>>> It would be great if we can make this option possible. We could >>>>>>>>> include >>>>>>>>> it in the next minor release fix. >>>>>>>>> >>>>>>>>> Sorry, for the bad user experience. But IMHO logical type are >>>>>>>>> still >>>>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>>>>>>> shortcomings such that Flink can properly support them as well. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Timo >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>>>>>>> >>>>>>>>> >>>>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>>>>>>> > Hi, >>>>>>>>> > >>>>>>>>> > I need to continuously ingest AVRO files as they arrive. >>>>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the >>>>>>>>> point >>>>>>>>> > here. I started trying to ingest a static bunch of files from >>>>>>>>> local fs >>>>>>>>> > first and I am having weird issues with AVRO deserialization. >>>>>>>>> > >>>>>>>>> > I have to say, the records contain logical types, timestamps-ms >>>>>>>>> and decimals >>>>>>>>> > >>>>>>>>> > To keep it simple, I extracted the AVRO schema from the data >>>>>>>>> files and >>>>>>>>> > used avro-maven-plugin to generate POJOs >>>>>>>>> > I tried multiple combinations, all with no luck >>>>>>>>> > >>>>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>>>>>>> > >>>>>>>>> > Path in = new Path(sourceBasePath); >>>>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>>>>>>> AvroInputFormat<>(in, >>>>>>>>> > AccountEntries.class); >>>>>>>>> > DataStream<AccountEntries> accountEntries = env >>>>>>>>> > .readFile(inputFormat, sourceBasePath, >>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>>>>>>> > >>>>>>>>> > *Result* >>>>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>>>>> > org.joda.time.DateTime >>>>>>>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>>>>>>> > >>>>>>>>> > *Result* >>>>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>>>>> java.time.Instant >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > 3) Generic record >>>>>>>>> > I am getting the Schema from the generated specific record, for >>>>>>>>> > convenience, but I am not using the generated POJO as record. >>>>>>>>> > I also followed the suggestions in this Flink blog post >>>>>>>>> > < >>>>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>>>>>>> >>>>>>>>> > to explicitly specify the TypeInfo with returns(...) >>>>>>>>> > >>>>>>>>> > Path in = new Path(config.sourceFileSystemPath); >>>>>>>>> > Schema schema = AccountEntries.getClassSchema(); >>>>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>>>>>>> AvroInputFormat<>(in, >>>>>>>>> > GenericRecord.class); >>>>>>>>> > DataStream<GenericRecord> accountEntries = env >>>>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > *Result* >>>>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>>>>>>> instantiable: >>>>>>>>> > The class is not a proper class. It is either abstract, an >>>>>>>>> interface, or >>>>>>>>> > a primitive type. >>>>>>>>> > >>>>>>>>> > This looks like a bug. >>>>>>>>> > I raised the ticket < >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>>>>>>> > and I will try to submit a fix, but still do not solve my >>>>>>>>> problem as I >>>>>>>>> > am using a managed Flink I cannot update. >>>>>>>>> > I cannot believe there is no workaround. I do not think I'm >>>>>>>>> trying to do >>>>>>>>> > anything bizarre. Am I? >>>>>>>>> > >>>>>>>>> > Any ideas? >>>>>>>>> > Am I missing something obvious? >>>>>>>>> > >>>>>>>>> > Cheers >>>>>>>>> > Lorenzo >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Arvid Heise | Senior Java Developer >>>>>>> >>>>>>> <https://www.ververica.com/> >>>>>>> >>>>>>> Follow us @VervericaData >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>> Conference >>>>>>> >>>>>>> Stream Processing | Event Driven | Real Time >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> >>>>>>> -- >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>>> Ji (Toni) Cheng >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng