Hi Arvit, I followed your instructions for the breakpoint in SpecificDatumReader.readField *with AVRO 1.8.2*,
For all timestamp-millis fields (I have many): Conversion<?> conversion = ((SpecificRecordBase) r).getConversion(f.pos()); returns null for all timestamp-millis fields (I have many), so... datum = readWithoutConversion(oldDatum, f.schema(), in); is used instead and returns a *Long* Not sure it's relevant, but in this version I have the explicit dependency org.apache.avro:avro:1.8.2 and I am using the avro-maven-plugin (1.8.2) to generate the record from .avsc with this configuration: <configuration> <stringType>String</stringType> <createSetters>true</createSetters> <fieldVisibility>private</fieldVisibility> <enableDecimalLogicalType>true</enableDecimalLogicalType> </configuration> Cheers Lorenzo On Thu, 11 Jun 2020 at 13:11, Arvid Heise <ar...@ververica.com> wrote: > Sorry forget my last mail, that was half-finished. > > Here is the real one: > > Hi Lorenzo, > > if you still have time to investigate. > > Your stack trace shows that all expected code paths have been taken. > Conversions are there; although they look different than here, but that can > be attributed to the avro upgrade. > > Could you put a breakpoint on SpecificDatumReader.readField, so that you > can inspect the conversion for the timestamp field? You probably want to > make it a conditional for f.name() == <your field name>. > The expected flow is that it should have a conversion that returns the > joda time instead of the long. Then datum should be the converted joda time. > > @Override > protected void readField(Object r, Schema.Field f, Object oldDatum, > ResolvingDecoder in, Object state) > throws IOException { > if (r instanceof SpecificRecordBase) { > Conversion<?> conversion = ((SpecificRecordBase) > r).getConversion(f.pos()); > > Object datum; > if (conversion != null) { > datum = readWithConversion( > oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); > } else { > datum = readWithoutConversion(oldDatum, f.schema(), in); > } > > getData().setField(r, f.name(), f.pos(), datum); > > } else { > super.readField(r, f, oldDatum, in, state); > } > } > > > On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Lorenzo, >> >> if you still have time to investigate. >> >> Your stack trace shows that all expected code paths have been taken. >> Conversions are there although they look different than here, but that can >> be attributed to the avro upgrade. >> >> @Override >> protected void readField(Object r, Schema.Field f, Object oldDatum, >> ResolvingDecoder in, Object state) >> throws IOException { >> if (r instanceof SpecificRecordBase) { >> Conversion<?> conversion = ((SpecificRecordBase) >> r).getConversion(f.pos()); >> >> Object datum; >> if (conversion != null) { >> datum = readWithConversion( >> oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); >> } else { >> datum = readWithoutConversion(oldDatum, f.schema(), in); >> } >> >> getData().setField(r, f.name(), f.pos(), datum); >> >> } else { >> super.readField(r, f, oldDatum, in, state); >> } >> } >> >> >> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> >> wrote: >> >>> >>> Thanks Gouwei, >>> >>> setting format.setReuseAvroValue(false) with 1.8.2-generated records >>> does not solve the problem. >>> >>> 12:02:59,314 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937. >>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>> org.joda.time.DateTime >>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>> at >>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>> at >>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>> at >>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>> >>> >>> ---------------------------- >>> >>> Summarising, the only working combination seems to be: >>> >>> - Use AVRO 1.9.2 code generation, setting >>> dateTimeLogicalTypeImplementation >>> = joda >>> - Enabling Object Reuse (being careful for the implications) >>> >>> Using AVRO 1.8.2 code generation does not work, with any of the other >>> workarounds. >>> Using Generic objects does not work for a bug in AvroSerializer >>> <https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords >>> also brings a number of other problems. >>> >>> I am not very comfortable with using AVRO objects generated with a >>> different AVRO version than the one supported by Flink. >>> I am going to map AVRO records into hand-written POJOs immediately after >>> the ingestion to reduce chances of further issues. I reckon this is very >>> empirical, but that's what the workaround looks to me :) >>> >>> Lorenzo >>> >>> P.S, I want to give a massive thank to this community. So far it has >>> been one of the most reactive and helpful I ever interacted with. >>> >>> On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, >>>> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false) >>>> ; >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道: >>>> >>>>> Hi Arvid, >>>>> >>>>> thanks for the point about catching records. Gotcha! >>>>> >>>>> Sorry I cannot share the full schema or generated code. It's a 3rd >>>>> party IP and we signed a meter-think NDA... I think I can post snippets. >>>>> The schema is heavily nested, including arrays of other record types >>>>> Types are primitives, or logical decimal and timestamp-millis. No >>>>> union. >>>>> >>>>> #conversion is in AccountEntries only (one of the nested records) and >>>>> looks like this: >>>>> >>>>> private static final org.apache.avro.Conversion<?>[] conversions = >>>>> new org.apache.avro.Conversion<?>[] { >>>>> null, >>>>> null, >>>>> null, >>>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >>>>> null, >>>>> null, >>>>> null, >>>>> null, >>>>> null, >>>>> null, >>>>> null >>>>> }; >>>>> >>>>> >>>>> Note that I have to generate the specific object with AVRO 1.9.2 Maven >>>>> Plugin. >>>>> With 1.8.2 generated code it fails with the following exception, >>>>> regardless setting enableObjectReuse() >>>>> >>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>> org.joda.time.DateTime >>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>>>> at >>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>> at >>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>>>> at >>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>>>> at >>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>>>> >>>>> >>>>> Thanks for the help >>>>> Lorenzo >>>>> >>>>> >>>>> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote: >>>>> >>>>>> Hi Lorenzo, >>>>>> >>>>>> I'm glad that it worked out somehow, but I'd still like to understand >>>>>> what went wrong, so it will work more smoothly for future users. I double >>>>>> checked and we even test AvroSerializer with logical types, so I'm a bit >>>>>> puzzled. >>>>>> >>>>>> Could you attach GlHeader or at least show us how GlHeader#conversions >>>>>> look like? I want to exclude the possibility that the source >>>>>> generator screwed up. >>>>>> >>>>>> Concerning object reuse is that you need to treat all POJO as >>>>>> immutable (I'm assuming that that's what your meant from your >>>>>> description), >>>>>> but you should also never cache values like >>>>>> class ShiftElements extends MapFunction { >>>>>> Object lastElement; >>>>>> >>>>>> Object map(Object newElement, Collector out) { >>>>>> out.collect(lastElement); >>>>>> lastElement = newElement; // <- never cache with >>>>>> enableObjectReuse >>>>>> } >>>>>> } >>>>>> >>>>>> (excuse my ugly code) >>>>>> >>>>>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora < >>>>>> lorenzo.nic...@gmail.com> wrote: >>>>>> >>>>>>> Hi Arvid, >>>>>>> >>>>>>> answering to your other questions >>>>>>> >>>>>>> Here is the stacktrace of the case (1), when I try to read using >>>>>>> specific records generated by the AVRO 1.8.2 plugin >>>>>>> >>>>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>>> org.joda.time.DateTime >>>>>>> at >>>>>>> com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>>>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>>>>>> at >>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> at >>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>>>>>> at >>>>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>>>>>> >>>>>>> >>>>>>> I also tried generating the specific object with avro 1.9.2 (2) but >>>>>>> forcing it to use Joda time but still didn't work >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>>>> Could not forward element to next operator >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>>>> at >>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>> at >>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>> at >>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>>>> ... 7 more >>>>>>> >>>>>>> >>>>>>> But in the second case, it seems the failure happens when Flink >>>>>>> tries to make a copy of the record. >>>>>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!* >>>>>>> >>>>>>> I am not sure I understand all implications of object reuse >>>>>>> in Flink, specifically. >>>>>>> I am familiar with the general risk of mutable messages, and I >>>>>>> always handle them as mutable even when they are POJO. Never mutating >>>>>>> and >>>>>>> forwarding the same record. >>>>>>> Not sure whether there are other implications in Flink. >>>>>>> >>>>>>> Many thanks >>>>>>> Lorenzo >>>>>>> >>>>>>> >>>>>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Lorenzo, >>>>>>>> >>>>>>>> 1) I'm surprised that this doesn't work. I'd like to see that >>>>>>>> stacktrace. >>>>>>>> >>>>>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could >>>>>>>> retest with dateTimeLogicalType='Joda' set, but then you will >>>>>>>> probably see the same issue as 1) >>>>>>>> >>>>>>>> 3) I'm surprised that this doesn't work either. There is a codepath >>>>>>>> since 2016 for GenericRecord and it's covered in a test. From the error >>>>>>>> description and the ticket, it looks like the issue is not the >>>>>>>> AvroInputFormat, but the serializer. So it would probably work >>>>>>>> with a different serializer (but that would cause back and forth type >>>>>>>> transformation). >>>>>>>> >>>>>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora < >>>>>>>> lorenzo.nic...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks Timo, >>>>>>>>> >>>>>>>>> the stacktrace with 1.9.2-generated specific file is the following >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>>>>>> Could not forward element to next operator >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum >>>>>>>>> type java.time.Instant: 2020-06-01T02:00:42.105Z >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>>>>>> at >>>>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>>>>> at >>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>>>>> at >>>>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>>>>>> ... 7 more >>>>>>>>> >>>>>>>>> >>>>>>>>> I reckon logical types might have been considered somehow >>>>>>>>> experimental since...ever. But, honestly, I've been using them in the >>>>>>>>> Kafka/Java ecosystem as well as in Spark without too many problems. >>>>>>>>> >>>>>>>>> For my specific use case, the schema is given. Messages are >>>>>>>>> produced by a 3rd party and we cannot change the schema (especially >>>>>>>>> because >>>>>>>>> it's a legit schema). >>>>>>>>> I am desperately looking for a workaround. >>>>>>>>> >>>>>>>>> I had a similar issue with a Kafka Source, and AVRO records >>>>>>>>> containing decimals and timestamps. Timestamps worked but not >>>>>>>>> decimals. >>>>>>>>> I was able to work around the problem using GenericRecords. >>>>>>>>> But Kafka source relies on AvroDeserializationSchema rather than >>>>>>>>> AvroSerializer, and has no problem handling GenericRecords. >>>>>>>>> >>>>>>>>> I'm honestly finding very confusing having different ways of >>>>>>>>> handling AVRO deserialization inside Flink core components. >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> Lorenzo >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Lorenzo, >>>>>>>>>> >>>>>>>>>> as far as I know we don't support Avro's logical times in Flink's >>>>>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] >>>>>>>>>> supports the >>>>>>>>>> 1.8.2 version of logical types but might be incompatible with >>>>>>>>>> 1.9.2. >>>>>>>>>> >>>>>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>>>>>>>> >>>>>>>>>> Could you send us the full stack trace? I think this should >>>>>>>>>> actually >>>>>>>>>> work, because specific records are handled as POJOs and those >>>>>>>>>> should be >>>>>>>>>> able to also deal with logical type's classes through Kryo. >>>>>>>>>> >>>>>>>>>> Reg 3) Generic record >>>>>>>>>> >>>>>>>>>> It would be great if we can make this option possible. We could >>>>>>>>>> include >>>>>>>>>> it in the next minor release fix. >>>>>>>>>> >>>>>>>>>> Sorry, for the bad user experience. But IMHO logical type are >>>>>>>>>> still >>>>>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>>>>>>>> shortcomings such that Flink can properly support them as well. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Timo >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> >>>>>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>>>>>>>> > Hi, >>>>>>>>>> > >>>>>>>>>> > I need to continuously ingest AVRO files as they arrive. >>>>>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the >>>>>>>>>> point >>>>>>>>>> > here. I started trying to ingest a static bunch of files from >>>>>>>>>> local fs >>>>>>>>>> > first and I am having weird issues with AVRO deserialization. >>>>>>>>>> > >>>>>>>>>> > I have to say, the records contain logical types, timestamps-ms >>>>>>>>>> and decimals >>>>>>>>>> > >>>>>>>>>> > To keep it simple, I extracted the AVRO schema from the data >>>>>>>>>> files and >>>>>>>>>> > used avro-maven-plugin to generate POJOs >>>>>>>>>> > I tried multiple combinations, all with no luck >>>>>>>>>> > >>>>>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>>>>>>>> > >>>>>>>>>> > Path in = new Path(sourceBasePath); >>>>>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>>>>>>>> AvroInputFormat<>(in, >>>>>>>>>> > AccountEntries.class); >>>>>>>>>> > DataStream<AccountEntries> accountEntries = env >>>>>>>>>> > .readFile(inputFormat, sourceBasePath, >>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>>>>>>>> > >>>>>>>>>> > *Result* >>>>>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>>>>>> > org.joda.time.DateTime >>>>>>>>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>>>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>>>>>>>> > >>>>>>>>>> > *Result* >>>>>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>>>>>> java.time.Instant >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > 3) Generic record >>>>>>>>>> > I am getting the Schema from the generated specific record, for >>>>>>>>>> > convenience, but I am not using the generated POJO as record. >>>>>>>>>> > I also followed the suggestions in this Flink blog post >>>>>>>>>> > < >>>>>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>>>>>>>> >>>>>>>>>> > to explicitly specify the TypeInfo with returns(...) >>>>>>>>>> > >>>>>>>>>> > Path in = new Path(config.sourceFileSystemPath); >>>>>>>>>> > Schema schema = AccountEntries.getClassSchema(); >>>>>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>>>>>>>> AvroInputFormat<>(in, >>>>>>>>>> > GenericRecord.class); >>>>>>>>>> > DataStream<GenericRecord> accountEntries = env >>>>>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>>>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > *Result* >>>>>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>>>>>>>> instantiable: >>>>>>>>>> > The class is not a proper class. It is either abstract, an >>>>>>>>>> interface, or >>>>>>>>>> > a primitive type. >>>>>>>>>> > >>>>>>>>>> > This looks like a bug. >>>>>>>>>> > I raised the ticket < >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>>>>>>>> > and I will try to submit a fix, but still do not solve my >>>>>>>>>> problem as I >>>>>>>>>> > am using a managed Flink I cannot update. >>>>>>>>>> > I cannot believe there is no workaround. I do not think I'm >>>>>>>>>> trying to do >>>>>>>>>> > anything bizarre. Am I? >>>>>>>>>> > >>>>>>>>>> > Any ideas? >>>>>>>>>> > Am I missing something obvious? >>>>>>>>>> > >>>>>>>>>> > Cheers >>>>>>>>>> > Lorenzo >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Arvid Heise | Senior Java Developer >>>>>>>> >>>>>>>> <https://www.ververica.com/> >>>>>>>> >>>>>>>> Follow us @VervericaData >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>>> Conference >>>>>>>> >>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>>> >>>>>>>> -- >>>>>>>> Ververica GmbH >>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung >>>>>>>> Jason, Ji (Toni) Cheng >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Arvid Heise | Senior Java Developer >>>>>> >>>>>> <https://www.ververica.com/> >>>>>> >>>>>> Follow us @VervericaData >>>>>> >>>>>> -- >>>>>> >>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>> Conference >>>>>> >>>>>> Stream Processing | Event Driven | Real Time >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> >>>>>> -- >>>>>> Ververica GmbH >>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>> Ji (Toni) Cheng >>>>>> >>>>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >