Hi, I write a test for the case 1 but it does not throw any exception. I use the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test. Best, Guowei
Arvid Heise <ar...@ververica.com> 于2020年6月11日周四 下午3:58写道: > Hi Lorenzo, > > I'm glad that it worked out somehow, but I'd still like to understand what > went wrong, so it will work more smoothly for future users. I double > checked and we even test AvroSerializer with logical types, so I'm a bit > puzzled. > > Could you attach GlHeader or at least show us how GlHeader#conversions look > like? I want to exclude the possibility that the source generator screwed > up. > > Concerning object reuse is that you need to treat all POJO as immutable > (I'm assuming that that's what your meant from your description), but you > should also never cache values like > class ShiftElements extends MapFunction { > Object lastElement; > > Object map(Object newElement, Collector out) { > out.collect(lastElement); > lastElement = newElement; // <- never cache with enableObjectReuse > } > } > > (excuse my ugly code) > > On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > >> Hi Arvid, >> >> answering to your other questions >> >> Here is the stacktrace of the case (1), when I try to read using >> specific records generated by the AVRO 1.8.2 plugin >> >> java.lang.ClassCastException: java.lang.Long cannot be cast to >> org.joda.time.DateTime >> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >> at >> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >> >> >> I also tried generating the specific object with avro 1.9.2 (2) but >> forcing it to use Joda time but still didn't work >> >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z >> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >> at >> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at >> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >> ... 7 more >> >> >> But in the second case, it seems the failure happens when Flink tries to >> make a copy of the record. >> So I followed your suggestion of enableObjectReuse() and* IT WORKS!* >> >> I am not sure I understand all implications of object reuse in Flink, >> specifically. >> I am familiar with the general risk of mutable messages, and I always >> handle them as mutable even when they are POJO. Never mutating and >> forwarding the same record. >> Not sure whether there are other implications in Flink. >> >> Many thanks >> Lorenzo >> >> >> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Lorenzo, >>> >>> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace. >>> >>> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest >>> with dateTimeLogicalType='Joda' set, but then you will probably see the >>> same issue as 1) >>> >>> 3) I'm surprised that this doesn't work either. There is a codepath >>> since 2016 for GenericRecord and it's covered in a test. From the error >>> description and the ticket, it looks like the issue is not the >>> AvroInputFormat, but the serializer. So it would probably work with a >>> different serializer (but that would cause back and forth type >>> transformation). >>> >>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> >>> wrote: >>> >>>> Thanks Timo, >>>> >>>> the stacktrace with 1.9.2-generated specific file is the following >>>> >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>> at >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>> java.time.Instant: 2020-06-01T02:00:42.105Z >>>> at >>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>> at >>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>> at >>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>> ... 7 more >>>> >>>> >>>> I reckon logical types might have been considered somehow experimental >>>> since...ever. But, honestly, I've been using them in the Kafka/Java >>>> ecosystem as well as in Spark without too many problems. >>>> >>>> For my specific use case, the schema is given. Messages are produced by >>>> a 3rd party and we cannot change the schema (especially because it's a >>>> legit schema). >>>> I am desperately looking for a workaround. >>>> >>>> I had a similar issue with a Kafka Source, and AVRO records containing >>>> decimals and timestamps. Timestamps worked but not decimals. >>>> I was able to work around the problem using GenericRecords. >>>> But Kafka source relies on AvroDeserializationSchema rather than >>>> AvroSerializer, and has no problem handling GenericRecords. >>>> >>>> I'm honestly finding very confusing having different ways of handling >>>> AVRO deserialization inside Flink core components. >>>> >>>> Cheers >>>> Lorenzo >>>> >>>> >>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote: >>>> >>>>> Hi Lorenzo, >>>>> >>>>> as far as I know we don't support Avro's logical times in Flink's >>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports >>>>> the >>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2. >>>>> >>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>>> >>>>> Could you send us the full stack trace? I think this should actually >>>>> work, because specific records are handled as POJOs and those should >>>>> be >>>>> able to also deal with logical type's classes through Kryo. >>>>> >>>>> Reg 3) Generic record >>>>> >>>>> It would be great if we can make this option possible. We could >>>>> include >>>>> it in the next minor release fix. >>>>> >>>>> Sorry, for the bad user experience. But IMHO logical type are still >>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>>> shortcomings such that Flink can properly support them as well. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> [1] >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>>> >>>>> >>>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>>> > Hi, >>>>> > >>>>> > I need to continuously ingest AVRO files as they arrive. >>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the >>>>> point >>>>> > here. I started trying to ingest a static bunch of files from local >>>>> fs >>>>> > first and I am having weird issues with AVRO deserialization. >>>>> > >>>>> > I have to say, the records contain logical types, timestamps-ms and >>>>> decimals >>>>> > >>>>> > To keep it simple, I extracted the AVRO schema from the data files >>>>> and >>>>> > used avro-maven-plugin to generate POJOs >>>>> > I tried multiple combinations, all with no luck >>>>> > >>>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>>> > >>>>> > Path in = new Path(sourceBasePath); >>>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>>> AvroInputFormat<>(in, >>>>> > AccountEntries.class); >>>>> > DataStream<AccountEntries> accountEntries = env >>>>> > .readFile(inputFormat, sourceBasePath, >>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>>> > >>>>> > *Result* >>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>> > org.joda.time.DateTime >>>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>>> > >>>>> > >>>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>>> > >>>>> > *Result* >>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>>> java.time.Instant >>>>> > >>>>> > >>>>> > 3) Generic record >>>>> > I am getting the Schema from the generated specific record, for >>>>> > convenience, but I am not using the generated POJO as record. >>>>> > I also followed the suggestions in this Flink blog post >>>>> > < >>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>>> >>>>> > to explicitly specify the TypeInfo with returns(...) >>>>> > >>>>> > Path in = new Path(config.sourceFileSystemPath); >>>>> > Schema schema = AccountEntries.getClassSchema(); >>>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>>> AvroInputFormat<>(in, >>>>> > GenericRecord.class); >>>>> > DataStream<GenericRecord> accountEntries = env >>>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>>> > >>>>> > >>>>> > *Result* >>>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>>> instantiable: >>>>> > The class is not a proper class. It is either abstract, an >>>>> interface, or >>>>> > a primitive type. >>>>> > >>>>> > This looks like a bug. >>>>> > I raised the ticket < >>>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>>> > and I will try to submit a fix, but still do not solve my problem as >>>>> I >>>>> > am using a managed Flink I cannot update. >>>>> > I cannot believe there is no workaround. I do not think I'm trying >>>>> to do >>>>> > anything bizarre. Am I? >>>>> > >>>>> > Any ideas? >>>>> > Am I missing something obvious? >>>>> > >>>>> > Cheers >>>>> > Lorenzo >>>>> >>>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >