Sorry forget my last mail, that was half-finished.

Here is the real one:

Hi Lorenzo,

if you still have time to investigate.

Your stack trace shows that all expected code paths have been taken.
Conversions are there; although they look different than here, but that can
be attributed to the avro upgrade.

Could you put a breakpoint on SpecificDatumReader.readField, so that you
can inspect the conversion for the timestamp field? You probably want to
make it a conditional for f.name() == <your field name>.
The expected flow is that it should have a conversion that returns the joda
time instead of the long. Then datum should be the converted joda time.

@Override
protected void readField(Object r, Schema.Field f, Object oldDatum,
                         ResolvingDecoder in, Object state)
    throws IOException {
  if (r instanceof SpecificRecordBase) {
    Conversion<?> conversion = ((SpecificRecordBase) r).getConversion(f.pos());

    Object datum;
    if (conversion != null) {
      datum = readWithConversion(
          oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
    } else {
      datum = readWithoutConversion(oldDatum, f.schema(), in);
    }

    getData().setField(r, f.name(), f.pos(), datum);

  } else {
    super.readField(r, f, oldDatum, in, state);
  }
}


On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Lorenzo,
>
> if you still have time to investigate.
>
> Your stack trace shows that all expected code paths have been taken.
> Conversions are there although they look different than here, but that can
> be attributed to the avro upgrade.
>
> @Override
> protected void readField(Object r, Schema.Field f, Object oldDatum,
>                          ResolvingDecoder in, Object state)
>     throws IOException {
>   if (r instanceof SpecificRecordBase) {
>     Conversion<?> conversion = ((SpecificRecordBase) 
> r).getConversion(f.pos());
>
>     Object datum;
>     if (conversion != null) {
>       datum = readWithConversion(
>           oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
>     } else {
>       datum = readWithoutConversion(oldDatum, f.schema(), in);
>     }
>
>     getData().setField(r, f.name(), f.pos(), datum);
>
>   } else {
>     super.readField(r, f, oldDatum, in, state);
>   }
> }
>
>
> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
> wrote:
>
>>
>> Thanks Gouwei,
>>
>> setting format.setReuseAvroValue(false) with 1.8.2-generated records
>> does not solve the problem.
>>
>> 12:02:59,314 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> ----------------------------
>>
>> Summarising, the only working combination seems to be:
>>
>>    - Use AVRO 1.9.2 code generation, setting 
>> dateTimeLogicalTypeImplementation
>>    = joda
>>    - Enabling Object Reuse (being careful for the implications)
>>
>> Using AVRO 1.8.2 code generation does not work, with any of the other
>> workarounds.
>> Using Generic objects does not work for a bug in AvroSerializer
>> <https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords
>> also brings a number of other problems.
>>
>> I am not very comfortable with using AVRO objects generated with a
>> different AVRO version than the one supported by Flink.
>> I am going to map AVRO records into hand-written POJOs immediately after
>> the ingestion to reduce chances of further issues. I reckon this is very
>> empirical, but that's what the workaround looks to me :)
>>
>> Lorenzo
>>
>> P.S, I want to give a massive thank to this community. So far it has been
>> one of the most reactive and helpful I ever interacted with.
>>
>> On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi,
>>> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道:
>>>
>>>> Hi Arvid,
>>>>
>>>> thanks for the point about catching records. Gotcha!
>>>>
>>>> Sorry I cannot share the full schema or generated code. It's a 3rd
>>>> party IP and we signed a meter-think NDA... I think I can post snippets.
>>>> The schema is heavily nested, including arrays of other record types
>>>> Types are primitives, or logical decimal and timestamp-millis. No union.
>>>>
>>>> #conversion is in AccountEntries only (one of the nested records) and
>>>> looks like this:
>>>>
>>>> private static final org.apache.avro.Conversion<?>[] conversions =
>>>>     new org.apache.avro.Conversion<?>[] {
>>>>     null,
>>>>     null,
>>>>     null,
>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>>>     null,
>>>>     null,
>>>>     null,
>>>>     null,
>>>>     null,
>>>>     null,
>>>>     null
>>>> };
>>>>
>>>>
>>>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>>>> Plugin.
>>>> With 1.8.2 generated code it fails with the following exception,
>>>> regardless setting enableObjectReuse()
>>>>
>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>> org.joda.time.DateTime
>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>>> at
>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> at
>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>> at
>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>>> at
>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>>
>>>>
>>>> Thanks for the help
>>>> Lorenzo
>>>>
>>>>
>>>> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote:
>>>>
>>>>> Hi Lorenzo,
>>>>>
>>>>> I'm glad that it worked out somehow, but I'd still like to understand
>>>>> what went wrong, so it will work more smoothly for future users. I double
>>>>> checked and we even test AvroSerializer with logical types, so I'm a bit
>>>>> puzzled.
>>>>>
>>>>> Could you attach GlHeader or at least show us how GlHeader#conversions
>>>>> look like? I want to exclude the possibility that the source generator
>>>>> screwed up.
>>>>>
>>>>> Concerning object reuse is that you need to treat all POJO as
>>>>> immutable (I'm assuming that that's what your meant from your 
>>>>> description),
>>>>> but you should also never cache values like
>>>>> class ShiftElements extends MapFunction {
>>>>>   Object lastElement;
>>>>>
>>>>>   Object map(Object newElement, Collector out) {
>>>>>     out.collect(lastElement);
>>>>>     lastElement = newElement; // <- never cache with enableObjectReuse
>>>>>   }
>>>>> }
>>>>>
>>>>> (excuse my ugly code)
>>>>>
>>>>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <
>>>>> lorenzo.nic...@gmail.com> wrote:
>>>>>
>>>>>> Hi Arvid,
>>>>>>
>>>>>> answering to your other questions
>>>>>>
>>>>>> Here is the stacktrace of the case (1),  when I try to read using
>>>>>> specific records generated by the AVRO 1.8.2 plugin
>>>>>>
>>>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>> org.joda.time.DateTime
>>>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>>>>> at
>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> at
>>>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>>>> at
>>>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>>>>
>>>>>>
>>>>>> I also tried generating the specific object with avro 1.9.2 (2)  but
>>>>>> forcing it to use Joda time but still didn't work
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>> Could not forward element to next operator
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>>> at
>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>>> ... 7 more
>>>>>>
>>>>>>
>>>>>> But in the second case, it seems the failure happens when Flink tries
>>>>>> to make a copy of the record.
>>>>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>>>>>>
>>>>>> I am not sure I understand all implications of object reuse in Flink,
>>>>>> specifically.
>>>>>> I am familiar with the general risk of mutable messages, and I always
>>>>>> handle them as mutable even when they are POJO. Never mutating and
>>>>>> forwarding the same record.
>>>>>> Not sure whether there are other implications in Flink.
>>>>>>
>>>>>> Many thanks
>>>>>> Lorenzo
>>>>>>
>>>>>>
>>>>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Lorenzo,
>>>>>>>
>>>>>>> 1) I'm surprised that this doesn't work. I'd like to see that
>>>>>>> stacktrace.
>>>>>>>
>>>>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could
>>>>>>> retest with dateTimeLogicalType='Joda' set, but then you will
>>>>>>> probably see the same issue as 1)
>>>>>>>
>>>>>>> 3) I'm surprised that this doesn't work either. There is a codepath
>>>>>>> since 2016 for GenericRecord and it's covered in a test. From the error
>>>>>>> description and the ticket, it looks like the issue is not the
>>>>>>> AvroInputFormat, but the serializer. So it would probably work with
>>>>>>> a different serializer (but that would cause back and forth type
>>>>>>> transformation).
>>>>>>>
>>>>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <
>>>>>>> lorenzo.nic...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Timo,
>>>>>>>>
>>>>>>>> the stacktrace with 1.9.2-generated specific file is the following
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>>> Could not forward element to next operator
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>>>> java.time.Instant: 2020-06-01T02:00:42.105Z
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>>>>> at
>>>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>>>> at
>>>>>>>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>>>> at
>>>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>>>>> ... 7 more
>>>>>>>>
>>>>>>>>
>>>>>>>> I reckon logical types might have been considered somehow
>>>>>>>> experimental since...ever. But, honestly, I've been using them in the
>>>>>>>> Kafka/Java ecosystem as well as in Spark without too many problems.
>>>>>>>>
>>>>>>>> For my specific use case, the schema is given. Messages are
>>>>>>>> produced by a 3rd party and we cannot change the schema (especially 
>>>>>>>> because
>>>>>>>> it's a legit schema).
>>>>>>>> I am desperately looking for a workaround.
>>>>>>>>
>>>>>>>> I  had a similar issue with a Kafka Source, and AVRO records
>>>>>>>> containing decimals and timestamps. Timestamps worked but not decimals.
>>>>>>>> I was able to work around the problem using GenericRecords.
>>>>>>>> But Kafka source relies on AvroDeserializationSchema rather than
>>>>>>>> AvroSerializer, and has no problem handling GenericRecords.
>>>>>>>>
>>>>>>>> I'm honestly finding very confusing having different ways of
>>>>>>>> handling AVRO deserialization inside Flink core components.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Lorenzo
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Lorenzo,
>>>>>>>>>
>>>>>>>>> as far as I know we don't support Avro's logical times in Flink's
>>>>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1]
>>>>>>>>> supports the
>>>>>>>>> 1.8.2 version of logical types but might be incompatible with
>>>>>>>>> 1.9.2.
>>>>>>>>>
>>>>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>>>>>>
>>>>>>>>> Could you send us the full stack trace? I think this should
>>>>>>>>> actually
>>>>>>>>> work, because specific records are handled as POJOs and those
>>>>>>>>> should be
>>>>>>>>> able to also deal with logical type's classes through Kryo.
>>>>>>>>>
>>>>>>>>> Reg 3) Generic record
>>>>>>>>>
>>>>>>>>> It would be great if we can make this option possible. We could
>>>>>>>>> include
>>>>>>>>> it in the next minor release fix.
>>>>>>>>>
>>>>>>>>> Sorry, for the bad user experience. But IMHO logical type are
>>>>>>>>> still
>>>>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>>>>>>> shortcomings such that Flink can properly support them as well.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > I need to continuously ingest AVRO files as they arrive.
>>>>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the
>>>>>>>>> point
>>>>>>>>> > here. I started trying to ingest a static bunch of files from
>>>>>>>>> local fs
>>>>>>>>> > first and I am having weird issues with AVRO deserialization.
>>>>>>>>> >
>>>>>>>>> > I have to say, the records contain logical types, timestamps-ms
>>>>>>>>> and decimals
>>>>>>>>> >
>>>>>>>>> > To keep it simple, I extracted the AVRO schema from the data
>>>>>>>>> files and
>>>>>>>>> > used avro-maven-plugin to generate POJOs
>>>>>>>>> > I tried multiple combinations, all with no luck
>>>>>>>>> >
>>>>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>>>>>>> >
>>>>>>>>> > Path in = new Path(sourceBasePath);
>>>>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>>>>>>> AvroInputFormat<>(in,
>>>>>>>>> > AccountEntries.class);
>>>>>>>>> > DataStream<AccountEntries> accountEntries = env
>>>>>>>>> > .readFile(inputFormat, sourceBasePath,
>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>>>>>>> >
>>>>>>>>> > *Result*
>>>>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>>>>> > org.joda.time.DateTime
>>>>>>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>>>>>>> >
>>>>>>>>> > *Result*
>>>>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>>>>> java.time.Instant
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > 3) Generic record
>>>>>>>>> > I am getting the Schema from the generated specific record, for
>>>>>>>>> > convenience, but I am not using the generated POJO as record.
>>>>>>>>> > I also followed the suggestions in this Flink blog post
>>>>>>>>> > <
>>>>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>>>>>>
>>>>>>>>> > to explicitly specify the TypeInfo with returns(...)
>>>>>>>>> >
>>>>>>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>>>>>>> > Schema schema = AccountEntries.getClassSchema();
>>>>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>>>>>>> AvroInputFormat<>(in,
>>>>>>>>> > GenericRecord.class);
>>>>>>>>> > DataStream<GenericRecord> accountEntries = env
>>>>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > *Result*
>>>>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>>>>>>> instantiable:
>>>>>>>>> > The class is not a proper class. It is either abstract, an
>>>>>>>>> interface, or
>>>>>>>>> > a primitive type.
>>>>>>>>> >
>>>>>>>>> > This looks like a bug.
>>>>>>>>> > I raised the ticket <
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>>>>>>> > and I will try to submit a fix, but still do not solve my
>>>>>>>>> problem as I
>>>>>>>>> > am using a managed Flink I cannot update.
>>>>>>>>> > I cannot believe there is no workaround. I do not think I'm
>>>>>>>>> trying to do
>>>>>>>>> > anything bizarre. Am I?
>>>>>>>>> >
>>>>>>>>> > Any ideas?
>>>>>>>>> > Am I missing something obvious?
>>>>>>>>> >
>>>>>>>>> > Cheers
>>>>>>>>> > Lorenzo
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>
>>>>>>> <https://www.ververica.com/>
>>>>>>>
>>>>>>> Follow us @VervericaData
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>> Conference
>>>>>>>
>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>
>>>>>>> --
>>>>>>> Ververica GmbH
>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>>> Ji (Toni) Cheng
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to