Hi Arvid,

thanks for the point about catching records. Gotcha!

Sorry I cannot share the full schema or generated code. It's a 3rd party IP
and we signed a meter-think NDA... I think I can post snippets.
The schema is heavily nested, including arrays of other record types
Types are primitives, or logical decimal and timestamp-millis. No union.

#conversion is in AccountEntries only (one of the nested records) and looks
like this:

private static final org.apache.avro.Conversion<?>[] conversions =
    new org.apache.avro.Conversion<?>[] {
    null,
    null,
    null,
    new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
    new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
    new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
    null,
    null,
    null,
    null,
    null,
    null,
    null
};


Note that I have to generate the specific object with AVRO 1.9.2 Maven
Plugin.
With 1.8.2 generated code it fails with the following exception, regardless
setting enableObjectReuse()

java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


Thanks for the help
Lorenzo


On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote:

> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
> went wrong, so it will work more smoothly for future users. I double
> checked and we even test AvroSerializer with logical types, so I'm a bit
> puzzled.
>
> Could you attach GlHeader or at least show us how GlHeader#conversions look
> like? I want to exclude the possibility that the source generator screwed
> up.
>
> Concerning object reuse is that you need to treat all POJO as immutable
> (I'm assuming that that's what your meant from your description), but you
> should also never cache values like
> class ShiftElements extends MapFunction {
>   Object lastElement;
>
>   Object map(Object newElement, Collector out) {
>     out.collect(lastElement);
>     lastElement = newElement; // <- never cache with enableObjectReuse
>   }
> }
>
> (excuse my ugly code)
>
> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> answering to your other questions
>>
>> Here is the stacktrace of the case (1),  when I try to read using
>> specific records generated by the AVRO 1.8.2 plugin
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> I also tried generating the specific object with avro 1.9.2 (2)  but
>> forcing it to use Joda time but still didn't work
>>
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at
>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>> ... 7 more
>>
>>
>> But in the second case, it seems the failure happens when Flink tries to
>> make a copy of the record.
>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>>
>> I am not sure I understand all implications of object reuse in Flink,
>> specifically.
>> I am familiar with the general risk of mutable messages, and I always
>> handle them as mutable even when they are POJO. Never mutating and
>> forwarding the same record.
>> Not sure whether there are other implications in Flink.
>>
>> Many thanks
>> Lorenzo
>>
>>
>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Lorenzo,
>>>
>>> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
>>>
>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest
>>> with dateTimeLogicalType='Joda' set, but then you will probably see the
>>> same issue as 1)
>>>
>>> 3) I'm surprised that this doesn't work either. There is a codepath
>>> since 2016 for GenericRecord and it's covered in a test. From the error
>>> description and the ticket, it looks like the issue is not the
>>> AvroInputFormat, but the serializer. So it would probably work with a
>>> different serializer (but that would cause back and forth type
>>> transformation).
>>>
>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Timo,
>>>>
>>>> the stacktrace with 1.9.2-generated specific file is the following
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>> at
>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>> java.time.Instant: 2020-06-01T02:00:42.105Z
>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>> at
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>> ... 7 more
>>>>
>>>>
>>>> I reckon logical types might have been considered somehow experimental
>>>> since...ever. But, honestly, I've been using them in the Kafka/Java
>>>> ecosystem as well as in Spark without too many problems.
>>>>
>>>> For my specific use case, the schema is given. Messages are produced by
>>>> a 3rd party and we cannot change the schema (especially because it's a
>>>> legit schema).
>>>> I am desperately looking for a workaround.
>>>>
>>>> I  had a similar issue with a Kafka Source, and AVRO records containing
>>>> decimals and timestamps. Timestamps worked but not decimals.
>>>> I was able to work around the problem using GenericRecords.
>>>> But Kafka source relies on AvroDeserializationSchema rather than
>>>> AvroSerializer, and has no problem handling GenericRecords.
>>>>
>>>> I'm honestly finding very confusing having different ways of handling
>>>> AVRO deserialization inside Flink core components.
>>>>
>>>> Cheers
>>>> Lorenzo
>>>>
>>>>
>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote:
>>>>
>>>>> Hi Lorenzo,
>>>>>
>>>>> as far as I know we don't support Avro's logical times in Flink's
>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports
>>>>> the
>>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>>>>>
>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>>
>>>>> Could you send us the full stack trace? I think this should actually
>>>>> work, because specific records are handled as POJOs and those should
>>>>> be
>>>>> able to also deal with logical type's classes through Kryo.
>>>>>
>>>>> Reg 3) Generic record
>>>>>
>>>>> It would be great if we can make this option possible. We could
>>>>> include
>>>>> it in the next minor release fix.
>>>>>
>>>>> Sorry, for the bad user experience. But IMHO logical type are still
>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>>> shortcomings such that Flink can properly support them as well.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1]
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>>
>>>>>
>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I need to continuously ingest AVRO files as they arrive.
>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the
>>>>> point
>>>>> > here. I started trying to ingest a static bunch of files from local
>>>>> fs
>>>>> > first and I am having weird issues with AVRO deserialization.
>>>>> >
>>>>> > I have to say, the records contain logical types, timestamps-ms and
>>>>> decimals
>>>>> >
>>>>> > To keep it simple, I extracted the AVRO schema from the data files
>>>>> and
>>>>> > used avro-maven-plugin to generate POJOs
>>>>> > I tried multiple combinations, all with no luck
>>>>> >
>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>>> >
>>>>> > Path in = new Path(sourceBasePath);
>>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>>> AvroInputFormat<>(in,
>>>>> > AccountEntries.class);
>>>>> > DataStream<AccountEntries> accountEntries = env
>>>>> > .readFile(inputFormat, sourceBasePath,
>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>>> >
>>>>> > *Result*
>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>> > org.joda.time.DateTime
>>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>>> >
>>>>> >
>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>>> >
>>>>> > *Result*
>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant
>>>>> >
>>>>> >
>>>>> > 3) Generic record
>>>>> > I am getting the Schema from the generated specific record, for
>>>>> > convenience, but I am not using the generated POJO as record.
>>>>> > I also followed the suggestions in this Flink blog post
>>>>> > <
>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>>
>>>>> > to explicitly specify the TypeInfo with returns(...)
>>>>> >
>>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>>> > Schema schema = AccountEntries.getClassSchema();
>>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>>> AvroInputFormat<>(in,
>>>>> > GenericRecord.class);
>>>>> > DataStream<GenericRecord> accountEntries = env
>>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>>> >
>>>>> >
>>>>> > *Result*
>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>>> instantiable:
>>>>> > The class is not a proper class. It is either abstract, an
>>>>> interface, or
>>>>> > a primitive type.
>>>>> >
>>>>> > This looks like a bug.
>>>>> > I raised the ticket <
>>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>>> > and I will try to submit a fix, but still do not solve my problem as
>>>>> I
>>>>> > am using a managed Flink I cannot update.
>>>>> > I cannot believe there is no workaround. I do not think I'm trying
>>>>> to do
>>>>> > anything bizarre. Am I?
>>>>> >
>>>>> > Any ideas?
>>>>> > Am I missing something obvious?
>>>>> >
>>>>> > Cheers
>>>>> > Lorenzo
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to