Hi,
for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);

Best,
Guowei


Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道:

> Hi Arvid,
>
> thanks for the point about catching records. Gotcha!
>
> Sorry I cannot share the full schema or generated code. It's a 3rd party
> IP and we signed a meter-think NDA... I think I can post snippets.
> The schema is heavily nested, including arrays of other record types
> Types are primitives, or logical decimal and timestamp-millis. No union.
>
> #conversion is in AccountEntries only (one of the nested records) and
> looks like this:
>
> private static final org.apache.avro.Conversion<?>[] conversions =
>     new org.apache.avro.Conversion<?>[] {
>     null,
>     null,
>     null,
>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>     null,
>     null,
>     null,
>     null,
>     null,
>     null,
>     null
> };
>
>
> Note that I have to generate the specific object with AVRO 1.9.2 Maven
> Plugin.
> With 1.8.2 generated code it fails with the following exception,
> regardless setting enableObjectReuse()
>
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.joda.time.DateTime
> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>
>
> Thanks for the help
> Lorenzo
>
>
> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Lorenzo,
>>
>> I'm glad that it worked out somehow, but I'd still like to understand
>> what went wrong, so it will work more smoothly for future users. I double
>> checked and we even test AvroSerializer with logical types, so I'm a bit
>> puzzled.
>>
>> Could you attach GlHeader or at least show us how GlHeader#conversions look
>> like? I want to exclude the possibility that the source generator screwed
>> up.
>>
>> Concerning object reuse is that you need to treat all POJO as immutable
>> (I'm assuming that that's what your meant from your description), but you
>> should also never cache values like
>> class ShiftElements extends MapFunction {
>>   Object lastElement;
>>
>>   Object map(Object newElement, Collector out) {
>>     out.collect(lastElement);
>>     lastElement = newElement; // <- never cache with enableObjectReuse
>>   }
>> }
>>
>> (excuse my ugly code)
>>
>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> answering to your other questions
>>>
>>> Here is the stacktrace of the case (1),  when I try to read using
>>> specific records generated by the AVRO 1.8.2 plugin
>>>
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>> at
>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>
>>>
>>> I also tried generating the specific object with avro 1.9.2 (2)  but
>>> forcing it to use Joda time but still didn't work
>>>
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>>> at
>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>> at
>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at
>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>> ... 7 more
>>>
>>>
>>> But in the second case, it seems the failure happens when Flink tries to
>>> make a copy of the record.
>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>>>
>>> I am not sure I understand all implications of object reuse in Flink,
>>> specifically.
>>> I am familiar with the general risk of mutable messages, and I always
>>> handle them as mutable even when they are POJO. Never mutating and
>>> forwarding the same record.
>>> Not sure whether there are other implications in Flink.
>>>
>>> Many thanks
>>> Lorenzo
>>>
>>>
>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Lorenzo,
>>>>
>>>> 1) I'm surprised that this doesn't work. I'd like to see that
>>>> stacktrace.
>>>>
>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could
>>>> retest with dateTimeLogicalType='Joda' set, but then you will probably
>>>> see the same issue as 1)
>>>>
>>>> 3) I'm surprised that this doesn't work either. There is a codepath
>>>> since 2016 for GenericRecord and it's covered in a test. From the error
>>>> description and the ticket, it looks like the issue is not the
>>>> AvroInputFormat, but the serializer. So it would probably work with a
>>>> different serializer (but that would cause back and forth type
>>>> transformation).
>>>>
>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <
>>>> lorenzo.nic...@gmail.com> wrote:
>>>>
>>>>> Thanks Timo,
>>>>>
>>>>> the stacktrace with 1.9.2-generated specific file is the following
>>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>> Could not forward element to next operator
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>> at
>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-06-01T02:00:42.105Z
>>>>> at
>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>> at
>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>> at
>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>> at
>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>> at
>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>> at
>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>> ... 7 more
>>>>>
>>>>>
>>>>> I reckon logical types might have been considered somehow experimental
>>>>> since...ever. But, honestly, I've been using them in the Kafka/Java
>>>>> ecosystem as well as in Spark without too many problems.
>>>>>
>>>>> For my specific use case, the schema is given. Messages are produced
>>>>> by a 3rd party and we cannot change the schema (especially because it's a
>>>>> legit schema).
>>>>> I am desperately looking for a workaround.
>>>>>
>>>>> I  had a similar issue with a Kafka Source, and AVRO records
>>>>> containing decimals and timestamps. Timestamps worked but not decimals.
>>>>> I was able to work around the problem using GenericRecords.
>>>>> But Kafka source relies on AvroDeserializationSchema rather than
>>>>> AvroSerializer, and has no problem handling GenericRecords.
>>>>>
>>>>> I'm honestly finding very confusing having different ways of handling
>>>>> AVRO deserialization inside Flink core components.
>>>>>
>>>>> Cheers
>>>>> Lorenzo
>>>>>
>>>>>
>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote:
>>>>>
>>>>>> Hi Lorenzo,
>>>>>>
>>>>>> as far as I know we don't support Avro's logical times in Flink's
>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports
>>>>>> the
>>>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>>>>>>
>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>>>
>>>>>> Could you send us the full stack trace? I think this should actually
>>>>>> work, because specific records are handled as POJOs and those should
>>>>>> be
>>>>>> able to also deal with logical type's classes through Kryo.
>>>>>>
>>>>>> Reg 3) Generic record
>>>>>>
>>>>>> It would be great if we can make this option possible. We could
>>>>>> include
>>>>>> it in the next minor release fix.
>>>>>>
>>>>>> Sorry, for the bad user experience. But IMHO logical type are still
>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>>>> shortcomings such that Flink can properly support them as well.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>>>
>>>>>>
>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I need to continuously ingest AVRO files as they arrive.
>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the
>>>>>> point
>>>>>> > here. I started trying to ingest a static bunch of files from local
>>>>>> fs
>>>>>> > first and I am having weird issues with AVRO deserialization.
>>>>>> >
>>>>>> > I have to say, the records contain logical types, timestamps-ms and
>>>>>> decimals
>>>>>> >
>>>>>> > To keep it simple, I extracted the AVRO schema from the data files
>>>>>> and
>>>>>> > used avro-maven-plugin to generate POJOs
>>>>>> > I tried multiple combinations, all with no luck
>>>>>> >
>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>>>> >
>>>>>> > Path in = new Path(sourceBasePath);
>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>>>> AvroInputFormat<>(in,
>>>>>> > AccountEntries.class);
>>>>>> > DataStream<AccountEntries> accountEntries = env
>>>>>> > .readFile(inputFormat, sourceBasePath,
>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>>>> >
>>>>>> > *Result*
>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>> > org.joda.time.DateTime
>>>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>>>> >
>>>>>> >
>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>>>> >
>>>>>> > *Result*
>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>> java.time.Instant
>>>>>> >
>>>>>> >
>>>>>> > 3) Generic record
>>>>>> > I am getting the Schema from the generated specific record, for
>>>>>> > convenience, but I am not using the generated POJO as record.
>>>>>> > I also followed the suggestions in this Flink blog post
>>>>>> > <
>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>>>
>>>>>> > to explicitly specify the TypeInfo with returns(...)
>>>>>> >
>>>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>>>> > Schema schema = AccountEntries.getClassSchema();
>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>>>> AvroInputFormat<>(in,
>>>>>> > GenericRecord.class);
>>>>>> > DataStream<GenericRecord> accountEntries = env
>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>>>> >
>>>>>> >
>>>>>> > *Result*
>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>>>> instantiable:
>>>>>> > The class is not a proper class. It is either abstract, an
>>>>>> interface, or
>>>>>> > a primitive type.
>>>>>> >
>>>>>> > This looks like a bug.
>>>>>> > I raised the ticket <
>>>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>>>> > and I will try to submit a fix, but still do not solve my problem
>>>>>> as I
>>>>>> > am using a managed Flink I cannot update.
>>>>>> > I cannot believe there is no workaround. I do not think I'm trying
>>>>>> to do
>>>>>> > anything bizarre. Am I?
>>>>>> >
>>>>>> > Any ideas?
>>>>>> > Am I missing something obvious?
>>>>>> >
>>>>>> > Cheers
>>>>>> > Lorenzo
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

Reply via email to