Hi Anitha,

As far as I can tell the problem is with avro itself. We upgraded avro version we use underneath in Flink 1.12.0. In 1.11.x we used avro 1.8.2, while starting from 1.12.x we use avro 1.10.0. Maybe that's the problem. You could try to upgrading the avro version in your program. Just add dependency on avro 1.10. If I remember correctly that should simply work.

If that does not solve the problem, I'd look into which field fails to be deserialized.

Best,

Dawid

On 13/04/2022 18:11, Anitha Thankappan wrote:
Hi Piotr,

*The code i wrtten in 1.13.1
*

    public final class BigQuerySourceFunction extends
    RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    ....
    @Override
    public void open(Configuration parameters) throws Exception {
    
deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
    }
    ......
    }

*this is working fine and i got the required output
*

    +----+-------------+--------------------------------+
    | op |          id |                           name |
    +----+-------------+--------------------------------+
    | +I |           1 |                            ABC |
    | +I |           2 |                            XYZ |
    +----+-------------+--------------------------------+
    2 rows in set


*same code ith 1.11.0,
*

       RuntimeContextInitializationContextAdapters cannot be resolved

*rewritten the code as
*

    @Override
    public void open(Configuration parameters) throws Exception {
    deserializer.open(() ->
    getRuntimeContext().getMetricGroup().addGroup("bigquery"));
    }

*but the result was:
*

    Caused by: java.io.IOException: Failed to deserialize Avro record.
    at
    
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
    at
    
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
    at
    
com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
    at
    
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at
    
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at
    
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
    Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out
    of bounds for length 2
    at
    org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
    at
    org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
    at
    
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
    at
    org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    at
    
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
    at
    
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
    at
    
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
    at
    
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at
    
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    at
    org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    at
    org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
    at
    
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

    Hey,

    Could you be more specific about how it is not working? A compiler
    error that there is no such class as
    RuntimeContextInitializationContextAdapters? This class has been
    introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this
    code and I also don't know where it's documented, but:
    a) maybe you should just mimic in reverse the changes done in the
    pull request from this issue [2]? `deserializer.open(() ->
    getRuntimeContext().getMetricGroup().addGroup("something"))`?
    b) RuntimeContextInitializationContextAdapters is `@Internal`
    class that is not part of a Public API, so even in 1.13.x you
    should be using it. You should probably just implement your
    own DeserializationSchema.InitializationContext.

    Best,
    Piotrek

    [1] https://issues.apache.org/jira/browse/FLINK-18363
    [2] https://github.com/apache/flink/pull/13844/files

    pon., 11 kwi 2022 o 15:42 Anitha Thankappan
    <anitha.thankap...@quantiphi.com> napisał(a):


        Hi,

        I developed a flink connector to read data from bigquery . The
        Bigquery read rows are in AVRO format.
        I tried it with 1.13.1 its working fine. But my requirement is
        1.11.0, in that case the code:
        
deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())
        is not working.

        What could be the alternative for this in 1.11.0?

        Thanks and Regards,
        Anitha Thankappan

        /This message contains information that may be privileged or
        confidential and is the property of the Quantiphi Inc and/or
        its affiliates//. It is intended only for the person to whom
        it is addressed. //If you are not the intended recipient, any
        review, dissemination, distribution, copying, storage or other
        use of all or any portion of this message is strictly
        prohibited. If you received this message in error, please
        immediately notify the sender by reply e-mail and delete this
        message in its //entirety/


/This message contains information that may be privileged or confidential and is the property of the Quantiphi Inc and/or its affiliates//. It is intended only for the person to whom it is addressed. //If you are not the intended recipient, any review, dissemination, distribution, copying, storage or other use of all or any portion of this message is strictly prohibited. If you received this message in error, please immediately notify the sender by reply e-mail and delete this message in its //entirety/

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to