Re: Avro deserialization issue
Hi Anitha, As far as I can tell the problem is with avro itself. We upgraded avro version we use underneath in Flink 1.12.0. In 1.11.x we used avro 1.8.2, while starting from 1.12.x we use avro 1.10.0. Maybe that's the problem. You could try to upgrading the avro version in your program. Just add dependency on avro 1.10. If I remember correctly that should simply work. If that does not solve the problem, I'd look into which field fails to be deserialized. Best, Dawid On 13/04/2022 18:11, Anitha Thankappan wrote: Hi Piotr, *The code i wrtten in 1.13.1 * public final class BigQuerySourceFunction extends RichSourceFunction implements ResultTypeQueryable { @Override public void open(Configuration parameters) throws Exception { deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())); } .. } *this is working fine and i got the required output * ++-++ | op | id | name | ++-++ | +I | 1 | ABC | | +I | 2 | XYZ | ++-++ 2 rows in set *same code ith 1.11.0, * RuntimeContextInitializationContextAdapters cannot be resolved *rewritten the code as * @Override public void open(Configuration parameters) throws Exception { deserializer.open(() -> getRuntimeContext().getMetricGroup().addGroup("bigquery")); } *but the result was: * Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) at com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds for length 2 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147) Thanks and Regards, Anitha Thankappan On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski wrote: Hey, Could you be more specific about how it is not working? A compiler error that there is no such class as RuntimeContextInitializationContextAdapters? This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this code and I also don't know where it's documented, but: a) maybe you should just mimic in reverse the changes done in the pull request from this issue [2]? `deserializer.open(() -> getRuntimeContext().getMetricGroup().addGroup("something"))`? b) RuntimeContextInitializationContextAdapters is `@Internal` class that is not part of a Public API, so even in 1.13.x you should be using it. You should probably just implement your own DeserializationSchema.InitializationContext. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-18363 [2] https://github.com/apache/flink/pull/13844/files pon., 11 kwi 2022 o 15:42 Anitha Thankappan napisał(a): Hi, I developed a flink connector to read data from bigquery . The Bigquery read rows are in AVRO format. I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in that case the code:
Re: Avro deserialization issue
Hi Piotr, *The code i wrtten in 1.13.1* public final class BigQuerySourceFunction extends RichSourceFunction implements ResultTypeQueryable { @Override public void open(Configuration parameters) throws Exception { deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())); } .. } *this is working fine and i got the required output* ++-++ | op | id | name | ++-++ | +I | 1 |ABC | | +I | 2 |XYZ | ++-++ 2 rows in set *same code ith 1.11.0,* RuntimeContextInitializationContextAdapters cannot be resolved *rewritten the code as * @Override public void open(Configuration parameters) throws Exception { deserializer.open(() -> getRuntimeContext().getMetricGroup().addGroup("bigquery")); } *but the result was:* Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) at com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds for length 2 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147) Thanks and Regards, Anitha Thankappan On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski wrote: > Hey, > > Could you be more specific about how it is not working? A compiler error > that there is no such class as RuntimeContextInitializationContextAdapters? > This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't > know this code and I also don't know where it's documented, but: > a) maybe you should just mimic in reverse the changes done in the pull > request from this issue [2]? `deserializer.open(() -> > getRuntimeContext().getMetricGroup().addGroup("something"))`? > b) RuntimeContextInitializationContextAdapters is `@Internal` class that > is not part of a Public API, so even in 1.13.x you should be using it. You > should probably just implement your > own DeserializationSchema.InitializationContext. > > Best, > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-18363 > [2] https://github.com/apache/flink/pull/13844/files > > pon., 11 kwi 2022 o 15:42 Anitha Thankappan < > anitha.thankap...@quantiphi.com> napisał(a): > >> >> Hi, >> >> I developed a flink connector to read data from bigquery . The Bigquery >> read rows are in AVRO format. >> I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in >> that case the code: >> deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()) >> is not working. >> >> What could be the alternative for this in 1.11.0? >> >> Thanks and Regards, >> Anitha Thankappan >> >> *This message contains information that may be privileged or confidential >> and is the property of the Quantiphi Inc and/or its affiliates**. It is >> intended only for the person to whom it is addressed. **If you are not >> the intended recipient, any review, dissemination, distribution, copying, >> storage or other use of all or any portion of this message is strictly >> prohibited. If you received this message in error, please immediately >> notify the sender by reply e-mail and delete this message in its * >> *entirety* >> > -- _This message contains information that may be privileged or
Re: Avro deserialization issue
Hey, Could you be more specific about how it is not working? A compiler error that there is no such class as RuntimeContextInitializationContextAdapters? This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this code and I also don't know where it's documented, but: a) maybe you should just mimic in reverse the changes done in the pull request from this issue [2]? `deserializer.open(() -> getRuntimeContext().getMetricGroup().addGroup("something"))`? b) RuntimeContextInitializationContextAdapters is `@Internal` class that is not part of a Public API, so even in 1.13.x you should be using it. You should probably just implement your own DeserializationSchema.InitializationContext. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-18363 [2] https://github.com/apache/flink/pull/13844/files pon., 11 kwi 2022 o 15:42 Anitha Thankappan napisał(a): > > Hi, > > I developed a flink connector to read data from bigquery . The Bigquery > read rows are in AVRO format. > I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in > that case the code: > deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()) > is not working. > > What could be the alternative for this in 1.11.0? > > Thanks and Regards, > Anitha Thankappan > > *This message contains information that may be privileged or confidential > and is the property of the Quantiphi Inc and/or its affiliates**. It is > intended only for the person to whom it is addressed. **If you are not > the intended recipient, any review, dissemination, distribution, copying, > storage or other use of all or any portion of this message is strictly > prohibited. If you received this message in error, please immediately > notify the sender by reply e-mail and delete this message in its * > *entirety* >