Re: Avro deserialization issue

2022-04-14 Thread Dawid Wysakowicz

Hi Anitha,

As far as I can tell the problem is with avro itself. We upgraded avro 
version we use underneath in Flink 1.12.0. In 1.11.x we used avro 1.8.2, 
while starting from 1.12.x we use avro 1.10.0. Maybe that's the problem. 
You could try to upgrading the avro version in your program. Just add 
dependency on avro 1.10. If I remember correctly that should simply work.


If that does not solve the problem, I'd look into which field fails to 
be deserialized.


Best,

Dawid

On 13/04/2022 18:11, Anitha Thankappan wrote:

Hi Piotr,

*The code i wrtten in 1.13.1
*

public final class BigQuerySourceFunction extends
RichSourceFunction implements ResultTypeQueryable {

@Override
public void open(Configuration parameters) throws Exception {

deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
}
..
}

*this is working fine and i got the required output
*

++-++
| op |          id |                           name |
++-++
| +I |           1 |                            ABC |
| +I |           2 |                            XYZ |
++-++
2 rows in set


*same code ith 1.11.0,
*

   RuntimeContextInitializationContextAdapters cannot be resolved

*rewritten the code as
*

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("bigquery"));
}

*but the result was:
*

Caused by: java.io.IOException: Failed to deserialize Avro record.
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at

com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at

org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out
of bounds for length 2
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at

org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at

org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at

org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at

org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski  
wrote:


Hey,

Could you be more specific about how it is not working? A compiler
error that there is no such class as
RuntimeContextInitializationContextAdapters? This class has been
introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this
code and I also don't know where it's documented, but:
a) maybe you should just mimic in reverse the changes done in the
pull request from this issue [2]? `deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("something"))`?
b) RuntimeContextInitializationContextAdapters is `@Internal`
class that is not part of a Public API, so even in 1.13.x you
should be using it. You should probably just implement your
own DeserializationSchema.InitializationContext.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18363
[2] https://github.com/apache/flink/pull/13844/files

pon., 11 kwi 2022 o 15:42 Anitha Thankappan
 napisał(a):


Hi,

I developed a flink connector to read data from bigquery . The
Bigquery read rows are in AVRO format.
I tried it with 1.13.1 its working fine. But my requirement is
1.11.0, in that case the code:


Re: Avro deserialization issue

2022-04-13 Thread Anitha Thankappan
Hi Piotr,


*The code i wrtten in 1.13.1*

public final class BigQuerySourceFunction extends
RichSourceFunction implements ResultTypeQueryable {

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
}
..
}


*this is working fine and i got the required output*

++-++
| op |  id |   name |
++-++
| +I |   1 |ABC |
| +I |   2 |XYZ |
++-++
2 rows in set



*same code ith 1.11.0,*

   RuntimeContextInitializationContextAdapters cannot be resolved


*rewritten the code as *

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("bigquery"));
}


*but the result was:*

Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at
com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds
for length 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski  wrote:

> Hey,
>
> Could you be more specific about how it is not working? A compiler error
> that there is no such class as RuntimeContextInitializationContextAdapters?
> This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't
> know this code and I also don't know where it's documented, but:
> a) maybe you should just mimic in reverse the changes done in the pull
> request from this issue [2]? `deserializer.open(() ->
> getRuntimeContext().getMetricGroup().addGroup("something"))`?
> b) RuntimeContextInitializationContextAdapters is `@Internal` class that
> is not part of a Public API, so even in 1.13.x you should be using it. You
> should probably just implement your
> own DeserializationSchema.InitializationContext.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18363
> [2] https://github.com/apache/flink/pull/13844/files
>
> pon., 11 kwi 2022 o 15:42 Anitha Thankappan <
> anitha.thankap...@quantiphi.com> napisał(a):
>
>>
>> Hi,
>>
>> I developed a flink connector to read data from bigquery . The Bigquery
>> read rows are in AVRO format.
>> I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in
>> that case the code:
>> deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())
>> is not working.
>>
>> What could be the alternative for this in 1.11.0?
>>
>> Thanks and Regards,
>> Anitha Thankappan
>>
>> *This message contains information that may be privileged or confidential
>> and is the property of the Quantiphi Inc and/or its affiliates**. It is
>> intended only for the person to whom it is addressed. **If you are not
>> the intended recipient, any review, dissemination, distribution, copying,
>> storage or other use of all or any portion of this message is strictly
>> prohibited. If you received this message in error, please immediately
>> notify the sender by reply e-mail and delete this message in its *
>> *entirety*
>>
>

-- 
_This message contains information that may be privileged or 

Re: Avro deserialization issue

2022-04-13 Thread Piotr Nowojski
Hey,

Could you be more specific about how it is not working? A compiler error
that there is no such class as RuntimeContextInitializationContextAdapters?
This class has been introduced in Flink 1.12 in FLINK-18363 [1]. I don't
know this code and I also don't know where it's documented, but:
a) maybe you should just mimic in reverse the changes done in the pull
request from this issue [2]? `deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("something"))`?
b) RuntimeContextInitializationContextAdapters is `@Internal` class that is
not part of a Public API, so even in 1.13.x you should be using it. You
should probably just implement your
own DeserializationSchema.InitializationContext.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18363
[2] https://github.com/apache/flink/pull/13844/files

pon., 11 kwi 2022 o 15:42 Anitha Thankappan 
napisał(a):

>
> Hi,
>
> I developed a flink connector to read data from bigquery . The Bigquery
> read rows are in AVRO format.
> I tried it with 1.13.1 its working fine. But my requirement is 1.11.0, in
> that case the code:
> deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext())
> is not working.
>
> What could be the alternative for this in 1.11.0?
>
> Thanks and Regards,
> Anitha Thankappan
>
> *This message contains information that may be privileged or confidential
> and is the property of the Quantiphi Inc and/or its affiliates**. It is
> intended only for the person to whom it is addressed. **If you are not
> the intended recipient, any review, dissemination, distribution, copying,
> storage or other use of all or any portion of this message is strictly
> prohibited. If you received this message in error, please immediately
> notify the sender by reply e-mail and delete this message in its *
> *entirety*
>