Hi Arvit,

I followed your instructions for the breakpoint in
SpecificDatumReader.readField *with  AVRO 1.8.2*,

For all timestamp-millis fields (I have many):

Conversion<?> conversion = ((SpecificRecordBase) r).getConversion(f.pos());


returns null for all timestamp-millis fields (I have many), so...

datum = readWithoutConversion(oldDatum, f.schema(), in);


is used instead and returns a *Long*



Not sure it's relevant, but in this version I have the explicit dependency
org.apache.avro:avro:1.8.2 and I am using the avro-maven-plugin (1.8.2) to
generate the record from .avsc with this configuration:

<configuration>
    <stringType>String</stringType>
    <createSetters>true</createSetters>
    <fieldVisibility>private</fieldVisibility>
    <enableDecimalLogicalType>true</enableDecimalLogicalType>
</configuration>


Cheers
Lorenzo


On Thu, 11 Jun 2020 at 13:11, Arvid Heise <ar...@ververica.com> wrote:

> Sorry forget my last mail, that was half-finished.
>
> Here is the real one:
>
> Hi Lorenzo,
>
> if you still have time to investigate.
>
> Your stack trace shows that all expected code paths have been taken.
> Conversions are there; although they look different than here, but that can
> be attributed to the avro upgrade.
>
> Could you put a breakpoint on SpecificDatumReader.readField, so that you
> can inspect the conversion for the timestamp field? You probably want to
> make it a conditional for f.name() == <your field name>.
> The expected flow is that it should have a conversion that returns the
> joda time instead of the long. Then datum should be the converted joda time.
>
> @Override
> protected void readField(Object r, Schema.Field f, Object oldDatum,
>                          ResolvingDecoder in, Object state)
>     throws IOException {
>   if (r instanceof SpecificRecordBase) {
>     Conversion<?> conversion = ((SpecificRecordBase) 
> r).getConversion(f.pos());
>
>     Object datum;
>     if (conversion != null) {
>       datum = readWithConversion(
>           oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
>     } else {
>       datum = readWithoutConversion(oldDatum, f.schema(), in);
>     }
>
>     getData().setField(r, f.name(), f.pos(), datum);
>
>   } else {
>     super.readField(r, f, oldDatum, in, state);
>   }
> }
>
>
> On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Lorenzo,
>>
>> if you still have time to investigate.
>>
>> Your stack trace shows that all expected code paths have been taken.
>> Conversions are there although they look different than here, but that can
>> be attributed to the avro upgrade.
>>
>> @Override
>> protected void readField(Object r, Schema.Field f, Object oldDatum,
>>                          ResolvingDecoder in, Object state)
>>     throws IOException {
>>   if (r instanceof SpecificRecordBase) {
>>     Conversion<?> conversion = ((SpecificRecordBase) 
>> r).getConversion(f.pos());
>>
>>     Object datum;
>>     if (conversion != null) {
>>       datum = readWithConversion(
>>           oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
>>     } else {
>>       datum = readWithoutConversion(oldDatum, f.schema(), in);
>>     }
>>
>>     getData().setField(r, f.name(), f.pos(), datum);
>>
>>   } else {
>>     super.readField(r, f, oldDatum, in, state);
>>   }
>> }
>>
>>
>> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>> wrote:
>>
>>>
>>> Thanks Gouwei,
>>>
>>> setting format.setReuseAvroValue(false) with 1.8.2-generated records
>>> does not solve the problem.
>>>
>>> 12:02:59,314 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
>>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>> at
>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>
>>>
>>> ----------------------------
>>>
>>> Summarising, the only working combination seems to be:
>>>
>>>    - Use AVRO 1.9.2 code generation, setting 
>>> dateTimeLogicalTypeImplementation
>>>    = joda
>>>    - Enabling Object Reuse (being careful for the implications)
>>>
>>> Using AVRO 1.8.2 code generation does not work, with any of the other
>>> workarounds.
>>> Using Generic objects does not work for a bug in AvroSerializer
>>> <https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords
>>> also brings a number of other problems.
>>>
>>> I am not very comfortable with using AVRO objects generated with a
>>> different AVRO version than the one supported by Flink.
>>> I am going to map AVRO records into hand-written POJOs immediately after
>>> the ingestion to reduce chances of further issues. I reckon this is very
>>> empirical, but that's what the workaround looks to me :)
>>>
>>> Lorenzo
>>>
>>> P.S, I want to give a massive thank to this community. So far it has
>>> been one of the most reactive and helpful I ever interacted with.
>>>
>>> On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi,
>>>> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false)
>>>> ;
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> thanks for the point about catching records. Gotcha!
>>>>>
>>>>> Sorry I cannot share the full schema or generated code. It's a 3rd
>>>>> party IP and we signed a meter-think NDA... I think I can post snippets.
>>>>> The schema is heavily nested, including arrays of other record types
>>>>> Types are primitives, or logical decimal and timestamp-millis. No
>>>>> union.
>>>>>
>>>>> #conversion is in AccountEntries only (one of the nested records) and
>>>>> looks like this:
>>>>>
>>>>> private static final org.apache.avro.Conversion<?>[] conversions =
>>>>>     new org.apache.avro.Conversion<?>[] {
>>>>>     null,
>>>>>     null,
>>>>>     null,
>>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>>     null,
>>>>>     null,
>>>>>     null,
>>>>>     null,
>>>>>     null,
>>>>>     null,
>>>>>     null
>>>>> };
>>>>>
>>>>>
>>>>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>>>>> Plugin.
>>>>> With 1.8.2 generated code it fails with the following exception,
>>>>> regardless setting enableObjectReuse()
>>>>>
>>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>> org.joda.time.DateTime
>>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>>>> at
>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> at
>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>>> at
>>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>>>> at
>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>>>
>>>>>
>>>>> Thanks for the help
>>>>> Lorenzo
>>>>>
>>>>>
>>>>> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote:
>>>>>
>>>>>> Hi Lorenzo,
>>>>>>
>>>>>> I'm glad that it worked out somehow, but I'd still like to understand
>>>>>> what went wrong, so it will work more smoothly for future users. I double
>>>>>> checked and we even test AvroSerializer with logical types, so I'm a bit
>>>>>> puzzled.
>>>>>>
>>>>>> Could you attach GlHeader or at least show us how GlHeader#conversions
>>>>>> look like? I want to exclude the possibility that the source
>>>>>> generator screwed up.
>>>>>>
>>>>>> Concerning object reuse is that you need to treat all POJO as
>>>>>> immutable (I'm assuming that that's what your meant from your 
>>>>>> description),
>>>>>> but you should also never cache values like
>>>>>> class ShiftElements extends MapFunction {
>>>>>>   Object lastElement;
>>>>>>
>>>>>>   Object map(Object newElement, Collector out) {
>>>>>>     out.collect(lastElement);
>>>>>>     lastElement = newElement; // <- never cache with
>>>>>> enableObjectReuse
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> (excuse my ugly code)
>>>>>>
>>>>>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <
>>>>>> lorenzo.nic...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Arvid,
>>>>>>>
>>>>>>> answering to your other questions
>>>>>>>
>>>>>>> Here is the stacktrace of the case (1),  when I try to read using
>>>>>>> specific records generated by the AVRO 1.8.2 plugin
>>>>>>>
>>>>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>>> org.joda.time.DateTime
>>>>>>> at
>>>>>>> com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>>>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>>>>>> at
>>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> at
>>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>>>>> at
>>>>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>>>>>
>>>>>>>
>>>>>>> I also tried generating the specific object with avro 1.9.2 (2)  but
>>>>>>> forcing it to use Joda time but still didn't work
>>>>>>>
>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>> Could not forward element to next operator
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>>>> at
>>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>> at
>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>> at
>>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>>>> ... 7 more
>>>>>>>
>>>>>>>
>>>>>>> But in the second case, it seems the failure happens when Flink
>>>>>>> tries to make a copy of the record.
>>>>>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>>>>>>>
>>>>>>> I am not sure I understand all implications of object reuse
>>>>>>> in Flink, specifically.
>>>>>>> I am familiar with the general risk of mutable messages, and I
>>>>>>> always handle them as mutable even when they are POJO. Never mutating 
>>>>>>> and
>>>>>>> forwarding the same record.
>>>>>>> Not sure whether there are other implications in Flink.
>>>>>>>
>>>>>>> Many thanks
>>>>>>> Lorenzo
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Lorenzo,
>>>>>>>>
>>>>>>>> 1) I'm surprised that this doesn't work. I'd like to see that
>>>>>>>> stacktrace.
>>>>>>>>
>>>>>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could
>>>>>>>> retest with dateTimeLogicalType='Joda' set, but then you will
>>>>>>>> probably see the same issue as 1)
>>>>>>>>
>>>>>>>> 3) I'm surprised that this doesn't work either. There is a codepath
>>>>>>>> since 2016 for GenericRecord and it's covered in a test. From the error
>>>>>>>> description and the ticket, it looks like the issue is not the
>>>>>>>> AvroInputFormat, but the serializer. So it would probably work
>>>>>>>> with a different serializer (but that would cause back and forth type
>>>>>>>> transformation).
>>>>>>>>
>>>>>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <
>>>>>>>> lorenzo.nic...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Timo,
>>>>>>>>>
>>>>>>>>> the stacktrace with 1.9.2-generated specific file is the following
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>>>> Could not forward element to next operator
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum
>>>>>>>>> type java.time.Instant: 2020-06-01T02:00:42.105Z
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>>>> at
>>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>>>>>> ... 7 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I reckon logical types might have been considered somehow
>>>>>>>>> experimental since...ever. But, honestly, I've been using them in the
>>>>>>>>> Kafka/Java ecosystem as well as in Spark without too many problems.
>>>>>>>>>
>>>>>>>>> For my specific use case, the schema is given. Messages are
>>>>>>>>> produced by a 3rd party and we cannot change the schema (especially 
>>>>>>>>> because
>>>>>>>>> it's a legit schema).
>>>>>>>>> I am desperately looking for a workaround.
>>>>>>>>>
>>>>>>>>> I  had a similar issue with a Kafka Source, and AVRO records
>>>>>>>>> containing decimals and timestamps. Timestamps worked but not 
>>>>>>>>> decimals.
>>>>>>>>> I was able to work around the problem using GenericRecords.
>>>>>>>>> But Kafka source relies on AvroDeserializationSchema rather than
>>>>>>>>> AvroSerializer, and has no problem handling GenericRecords.
>>>>>>>>>
>>>>>>>>> I'm honestly finding very confusing having different ways of
>>>>>>>>> handling AVRO deserialization inside Flink core components.
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>> Lorenzo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Lorenzo,
>>>>>>>>>>
>>>>>>>>>> as far as I know we don't support Avro's logical times in Flink's
>>>>>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1]
>>>>>>>>>> supports the
>>>>>>>>>> 1.8.2 version of logical types but might be incompatible with
>>>>>>>>>> 1.9.2.
>>>>>>>>>>
>>>>>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>>>>>>>
>>>>>>>>>> Could you send us the full stack trace? I think this should
>>>>>>>>>> actually
>>>>>>>>>> work, because specific records are handled as POJOs and those
>>>>>>>>>> should be
>>>>>>>>>> able to also deal with logical type's classes through Kryo.
>>>>>>>>>>
>>>>>>>>>> Reg 3) Generic record
>>>>>>>>>>
>>>>>>>>>> It would be great if we can make this option possible. We could
>>>>>>>>>> include
>>>>>>>>>> it in the next minor release fix.
>>>>>>>>>>
>>>>>>>>>> Sorry, for the bad user experience. But IMHO logical type are
>>>>>>>>>> still
>>>>>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>>>>>>>> shortcomings such that Flink can properly support them as well.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>>>>>>>> > Hi,
>>>>>>>>>> >
>>>>>>>>>> > I need to continuously ingest AVRO files as they arrive.
>>>>>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the
>>>>>>>>>> point
>>>>>>>>>> > here. I started trying to ingest a static bunch of files from
>>>>>>>>>> local fs
>>>>>>>>>> > first and I am having weird issues with AVRO deserialization.
>>>>>>>>>> >
>>>>>>>>>> > I have to say, the records contain logical types, timestamps-ms
>>>>>>>>>> and decimals
>>>>>>>>>> >
>>>>>>>>>> > To keep it simple, I extracted the AVRO schema from the data
>>>>>>>>>> files and
>>>>>>>>>> > used avro-maven-plugin to generate POJOs
>>>>>>>>>> > I tried multiple combinations, all with no luck
>>>>>>>>>> >
>>>>>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>>>>>>>> >
>>>>>>>>>> > Path in = new Path(sourceBasePath);
>>>>>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>>>>>>>> AvroInputFormat<>(in,
>>>>>>>>>> > AccountEntries.class);
>>>>>>>>>> > DataStream<AccountEntries> accountEntries = env
>>>>>>>>>> > .readFile(inputFormat, sourceBasePath,
>>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>>>>>>>> >
>>>>>>>>>> > *Result*
>>>>>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>>>>>> > org.joda.time.DateTime
>>>>>>>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>>>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>>>>>>>> >
>>>>>>>>>> > *Result*
>>>>>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>>>>>> java.time.Instant
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > 3) Generic record
>>>>>>>>>> > I am getting the Schema from the generated specific record, for
>>>>>>>>>> > convenience, but I am not using the generated POJO as record.
>>>>>>>>>> > I also followed the suggestions in this Flink blog post
>>>>>>>>>> > <
>>>>>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>>>>>>>
>>>>>>>>>> > to explicitly specify the TypeInfo with returns(...)
>>>>>>>>>> >
>>>>>>>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>>>>>>>> > Schema schema = AccountEntries.getClassSchema();
>>>>>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>>>>>>>> AvroInputFormat<>(in,
>>>>>>>>>> > GenericRecord.class);
>>>>>>>>>> > DataStream<GenericRecord> accountEntries = env
>>>>>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>>>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > *Result*
>>>>>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>>>>>>>> instantiable:
>>>>>>>>>> > The class is not a proper class. It is either abstract, an
>>>>>>>>>> interface, or
>>>>>>>>>> > a primitive type.
>>>>>>>>>> >
>>>>>>>>>> > This looks like a bug.
>>>>>>>>>> > I raised the ticket <
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>>>>>>>> > and I will try to submit a fix, but still do not solve my
>>>>>>>>>> problem as I
>>>>>>>>>> > am using a managed Flink I cannot update.
>>>>>>>>>> > I cannot believe there is no workaround. I do not think I'm
>>>>>>>>>> trying to do
>>>>>>>>>> > anything bizarre. Am I?
>>>>>>>>>> >
>>>>>>>>>> > Any ideas?
>>>>>>>>>> > Am I missing something obvious?
>>>>>>>>>> >
>>>>>>>>>> > Cheers
>>>>>>>>>> > Lorenzo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>>
>>>>>>>> <https://www.ververica.com/>
>>>>>>>>
>>>>>>>> Follow us @VervericaData
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>> Conference
>>>>>>>>
>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ververica GmbH
>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung
>>>>>>>> Jason, Ji (Toni) Cheng
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Arvid Heise | Senior Java Developer
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to