Hi Piotr,

*The code i wrtten in 1.13.1*

public final class BigQuerySourceFunction extends
RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
....
@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
}
......
}


*this is working fine and i got the required output*

+----+-------------+--------------------------------+
| op |          id |                           name |
+----+-------------+--------------------------------+
| +I |           1 |                            ABC |
| +I |           2 |                            XYZ |
+----+-------------+--------------------------------+
2 rows in set



*same code ith 1.11.0,*

   RuntimeContextInitializationContextAdapters cannot be resolved


*rewritten the code as *

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("bigquery"));
}


*but the result was:*

Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at
com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds
for length 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hey,
>
> Could you be more specific about how it is not working? A compiler error
> that there is no such class as RuntimeContextInitializationContextAdapters?
> This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't
> know this code and I also don't know where it's documented, but:
> a) maybe you should just mimic in reverse the changes done in the pull
> request from this issue [2]? `deserializer.open(() ->
> getRuntimeContext().getMetricGroup().addGroup("something"))`?
> b) RuntimeContextInitializationContextAdapters is `@Internal` class that
> is not part of a Public API, so even in 1.13.x you should be using it. You
> should probably just implement your
> own DeserializationSchema.InitializationContext.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18363
> [2] https://github.com/apache/flink/pull/13844/files
>
> pon., 11 kwi 2022 o 15:42 Anitha Thankappan <
> anitha.thankap...@quantiphi.com> napisał(a):
>
>>
>> Hi,
>>
>> I developed a flink connector to read data from bigquery . The Bigquery
>> read rows are in AVRO format.
>> I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in
>> that case the code:
>> deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())
>> is not working.
>>
>> What could be the alternative for this in 1.11.0?
>>
>> Thanks and Regards,
>> Anitha Thankappan
>>
>> *This message contains information that may be privileged or confidential
>> and is the property of the Quantiphi Inc and/or its affiliates**. It is
>> intended only for the person to whom it is addressed. **If you are not
>> the intended recipient, any review, dissemination, distribution, copying,
>> storage or other use of all or any portion of this message is strictly
>> prohibited. If you received this message in error, please immediately
>> notify the sender by reply e-mail and delete this message in its *
>> *entirety*
>>
>

-- 
_This message contains information that may be privileged or confidential 
and is the property of the Quantiphi Inc and/or its affiliates_. It is 
intended only for the person to whom it is addressed. _If you are not the 
intended recipient, any review, dissemination, distribution, copying, 
storage or other use of all or any portion of this message is strictly 
prohibited. If you received this message in error, please immediately 
notify the sender by reply e-mail and delete this message in its 
*entirety*___

Reply via email to