Handling non-transient exceptions

2022-04-14 Thread Jose Brandao
Hello,

Searching some expertise on exception handling with checkpointing and 
streaming.  Let’s say some bad data flows into your Flink application and 
causes an exception you are not expecting. That exception will bubble up, 
ending up in killing the respective task and the app will not be able to 
progress. Eventually the topology will restart (if configured so) from the 
previous successful checkpoint/savepoint and will hit that broken message 
again, resulting in a loop.

If we don’t know how to process a given message we would like our topology to 
progress and sink that message into some sort of dead-letter kafka topic.

We have seen some recommendation on using Side 
Outputs
 for that but it looks like things can easily get messy with that. We would 
need to extend all our operators with try-catch blocks and side output messages 
within the catch. Then we would need to aggregate all those side outputs and 
decide what to do with them. If we want to output exactly the inbound message 
that originated the exception it requires some extra logic as well since our 
operators have different output types. On top of that it looks like the type of 
operators which allow side outputs is 
limited.https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs

Wondering if there is a better way to do it? We would like to avoid our 
topology to be stuck because one message originates some unpredicted exception 
and we would also like to have as well the possibility to replay it once we put 
a fix in place, hence the dead letter topic idea.

Regards,
José Brandão





Re: How to debug Metaspace exception?

2022-04-14 Thread John Smith
Hi, so I have a dump file. What do I look for?

On Thu, Mar 31, 2022 at 3:28 PM John Smith  wrote:

> Ok so if there's a leak, if I manually stop the job and restart it from
> the UI multiple times, I won't see the issue because because the classes
> are unloaded correctly?
>
>
> On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:
>
>>
>> The difference is that manually canceling the job stops the JobMaster,
>> but automatic failover keeps the JobMaster running. But looking on
>> TaskManager, it doesn't make much difference
>>
>>
>> 2022年3月31日 上午4:01,John Smith  写道:
>>
>> Also if I manually cancel and restart the same job over and over is it
>> the same as if flink was restarting a job due to failure?
>>
>> I.e: When I click "Cancel Job" on the UI is the job completely unloaded
>> vs when the job scheduler restarts a job because if whatever reason?
>>
>> Lile this I'll stop and restart the job a few times or maybe I can trick
>> my job to fail and have the scheduler restart it. Ok let me think about
>> this...
>>
>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>
>>> So if I run the same jobs in my dev env will I still be able to see the
>>> similar dump?
>>>
>>> I think running the same job in dev should be reproducible, maybe you
>>> can have a try.
>>>
>>>  If not I would have to wait at a low volume time to do it on
>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>
>>> Yes, JMAP will pause the JVM, the time of pause depends on the size to
>>> dump. you can use "jmap -dump:live" to dump only the reachable objects,
>>> this will take a brief pause
>>>
>>>
>>>
>>> 2022年3月30日 下午9:47,John Smith  写道:
>>>
>>> I have 3 task managers (see config below). There is total of 10 jobs
>>> with 25 slots being used.
>>> The jobs are 100% ETL I.e; They load Json, transform it and push it to
>>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>>
>>> FOR JMAP. I know that it will pause the task manager. So if I run the
>>> same jobs in my dev env will I still be able to see the similar dump? I I
>>> assume so. If not I would have to wait at a low volume time to do it on
>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>
>>>
>>> # Operating system has 16GB total.
>>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>>
>>> cluster.evenly-spread-out-slots: true
>>>
>>> taskmanager.memory.flink.size: 10240m
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>> taskmanager.numberOfTaskSlots: 16
>>> parallelism.default: 1
>>>
>>> high-availability: zookeeper
>>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>>> high-availability.zookeeper.quorum: ...
>>> high-availability.zookeeper.path.root: /flink_1_14
>>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>>
>>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>>
>>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>>
 Hi, John

 Could you tell us you application scenario? Is it a flink session
 cluster with a lot of jobs?

 Maybe you can try to dump the memory with jmap and use tools such as
 MAT to analyze whether there are abnormal classes and classloaders


 > 2022年3月30日 上午6:09,John Smith  写道:
 >
 > Hi running 1.14.4
 >
 > My tasks manager still fails with java.lang.OutOfMemoryError:
 Metaspace. The metaspace out-of-memory error has occurred. This can mean
 two things: either the job requires a larger size of JVM metaspace to load
 classes or there is a class loading leak.
 >
 > I have 2GB of metaspace configed
 taskmanager.memory.jvm-metaspace.size: 2048m
 >
 > But the task nodes still fail.
 >
 > When looking at the UI metrics, the metaspace starts low. Now I see
 85% usage. It seems to be a class loading leak at this point, how can we
 debug this issue?


>>>
>>


Re: Interrupt collect() function when reading from kafka topic (in pyflink)

2022-04-14 Thread Dian Fu
Hi Marjan,

The method `collect` is used to collect the content of a table. However, as
`insert_statement` is a `INSERT INTO` statement and so there is no table to
collect from in your example. You could try the following code:
```
sql_statement = """
SELECT window_start, window_end, COUNT(*) as how_any
FROM TABLE(
TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
GROUP BY window_start, window_end
"""

with t_env.execute_sql(sql_statement).collect() as results:
for res in results:
print(res)
```

Regards,
Dian


On Thu, Apr 14, 2022 at 6:24 PM Marjan Jordanovski 
wrote:

> Hello,
>
> I have a simple source table (which is using kafka connector) that's
> reading and storing data from specific kafka topic. I also have print
> table:
>
>> t_env.execute_sql("""
>> CREATE TABLE print (
>> window_start TIMESTAMP(3),
>> window_end TIMESTAMP(3),
>> how_any BIGINT
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """)
>
>
> insert_statement = """
>> INSERT INTO print (
>> SELECT window_start, window_end, COUNT(*) as how_any
>> FROM TABLE(
>> TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
>> GROUP BY window_start, window_end
>> )
>> """
>
> Now in order to print from print table, I used:
>
>> with t_env.execute_sql(insert_statement).collect() as results:
>> for res in results:
>> print(res)
>
>
> But for loop here is useless, as program never gets there. I am seeing
> outputs from print table in my stdout, but program will run infinitely,
> because (I guess) all the outputs that I see are generated in collect()
> process.
>
> Is there a way to interrupt collect() here? I wanted to stop the program
> when for example res contains '2023' string, but I couldn't as
> program never gets to for loop.
> I tried with print() and wait() but same results (when grabbing from kafka
> topic, program never gets to for loop).
>
> Thank you,
> Marjan
>


Re: FW: Pyflink Kafka consumer error (version 1.14.4)

2022-04-14 Thread Dian Fu
Hi Harshit,

I think you could double check whether the version of
flink-sql-connector-kafka.jar
is also 1.14.4.

Regards,
Dian

On Thu, Apr 14, 2022 at 7:51 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

>
>
>
>
> *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai]
> *Sent:* Thursday, April 14, 2022 4:04 PM
> *To:* user-i...@flink.apache.org
> *Cc:* harshit.varsh...@iktara.ai
> *Subject:* Pyflink Kafka consumer error (version 1.14.4)
>
>
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error when using FlinkKafkaConsumer connector.
>
> : org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/MetricGroup;
>
>
>
> Below is my code for reference..
>
>
>
> def streaming_square_roots():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # the sql connector for kafka is used here as it's a fat jar and could
> avoid dependency issues
>
> env.add_jars("
> file:///C:/Users/Admin/Desktop/test11/flink-sql-connector-kafka.jar")
>
>
>
> deserialization_schema = SimpleStringSchema()
>
>
>
> kafka_consumer = FlinkKafkaConsumer(
>
> topics='new-numbers',
>
> deserialization_schema=deserialization_schema,
>
> properties={'bootstrap.servers': 'localhost:9092'})
>
>
>
> ds = env.add_source(kafka_consumer)
>
>
>
> ds.print()
>
>
>
>
>
> # 4. create sink and emit result to sink
>
>
>
> env.execute(job_name='streaming_square_roots')
>
>
>
>
>
> if __name__ == '__main__':
>
> streaming_square_roots()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>


FW: Pyflink Kafka consumer error (version 1.14.4)

2022-04-14 Thread harshit.varsh...@iktara.ai
 

 

From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] 
Sent: Thursday, April 14, 2022 4:04 PM
To: user-i...@flink.apache.org
Cc: harshit.varsh...@iktara.ai
Subject: Pyflink Kafka consumer error (version 1.14.4)

 

Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages. 

 

I am getting following error when using FlinkKafkaConsumer connector. 

: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/ap
ache/flink/metrics/MetricGroup;

 

Below is my code for reference..

 

def streaming_square_roots():

env = StreamExecutionEnvironment.get_execution_environment()

# the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues

 
env.add_jars("file:///C:/Users/Admin/Desktop/test11/flink-sql-connector-kafk
a.jar 
")

 

deserialization_schema = SimpleStringSchema()

 

kafka_consumer = FlinkKafkaConsumer(

topics='new-numbers',

deserialization_schema=deserialization_schema,

properties={'bootstrap.servers': 'localhost:9092'})

 

ds = env.add_source(kafka_consumer)

 

ds.print()

 

 

# 4. create sink and emit result to sink

 

env.execute(job_name='streaming_square_roots')

 

 

if __name__ == '__main__':

streaming_square_roots()

 

Thanks and Regards,

Harshit

 



Interrupt collect() function when reading from kafka topic (in pyflink)

2022-04-14 Thread Marjan Jordanovski
Hello,

I have a simple source table (which is using kafka connector) that's
reading and storing data from specific kafka topic. I also have print
table:

> t_env.execute_sql("""
> CREATE TABLE print (
> window_start TIMESTAMP(3),
> window_end TIMESTAMP(3),
> how_any BIGINT
> ) WITH (
> 'connector' = 'print'
> )
> """)


insert_statement = """
> INSERT INTO print (
> SELECT window_start, window_end, COUNT(*) as how_any
> FROM TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
> GROUP BY window_start, window_end
> )
> """

Now in order to print from print table, I used:

> with t_env.execute_sql(insert_statement).collect() as results:
> for res in results:
> print(res)


But for loop here is useless, as program never gets there. I am seeing
outputs from print table in my stdout, but program will run infinitely,
because (I guess) all the outputs that I see are generated in collect()
process.

Is there a way to interrupt collect() here? I wanted to stop the program
when for example res contains '2023' string, but I couldn't as
program never gets to for loop.
I tried with print() and wait() but same results (when grabbing from kafka
topic, program never gets to for loop).

Thank you,
Marjan


Re: Flink state migration from 1.9 to 1.13

2022-04-14 Thread Martijn Visser
Hi Qinghui,

If you're using SQL, please be aware that there are unfortunately no
application state compatibility guarantees. You can read more about this in
the documentation [1]. This is why the community has been working on
FLIP-190 to support this in future versions [2]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#table-api--sql
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489

On Thu, 14 Apr 2022 at 11:20, XU Qinghui  wrote:

> Hello Yu'an,
>
> Thanks for the reply.
> I'm using the SQL api, not using the `DataStream` API in the job. So
> there's no `keyby` call directly in our code, but we do have some `group
> by` and joins in the SQL. (We are using deprecated table planners both
> before and after migration)
> Do you know what could be the cause of the incompatibility?
>
> BR,
>
>
>
> Le jeu. 14 avr. 2022 à 04:20, yu'an huang  a écrit :
>
>> Hi Qinghui,
>>
>> Did you used a difference keyby() for your KeyedCoProcesserOperator? For
>> example, did you use a fied name (keyBy(“id”)) in 1.9 and while use a
>> lambda (keyBy(e->e.getId()) in 1.13. This will make the key serializer
>> incompatible.
>>
>> You may reference this link for how to use Apache Flink’s State Processor
>> API to modifying savepoints:
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>>
>>
>> On 14 Apr 2022, at 1:44 AM, XU Qinghui  wrote:
>>
>> The new key serializer (org.apache.flink.api.java.typeutils.runtime.
>> RowSerializer@896510d) must be compatible with the previous key
>> serializer
>>
>>
>>


RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-14 Thread Alexis Sarda-Espinosa
Hello,

There was a network issue in my environment and the job had to restart. After 
the job came back up, the logs showed a lot of lines like this:

RocksDBIncrementalRestoreOperation ... Starting to restore from state handle: 
...

Interestingly, those entries include information about sizes in bytes:

- 
445163.sst=ByteStreamStateHandle{handleName='file:/opt/flink/state/checkpoints//shared/18f95afa-dc66-467d-bd05-779895f24960',
 dataBytes=1328}
- privateState={MANIFEST-04=File State: 
file:/opt/flink/state/checkpoints//shared/bd7fde24-3ef6-4e05-bbd6-1474f8051d5d
 [80921331 bytes]

I extracted a lot of that information and I can see that:

- If I sum all dataBytes from sharedState, that only accounts for a couple MB.
- Most of the state comes from privateState, specifically from the entries 
referring to MANIFEST File State; that accounts for almost 1.5GB.

I believe that is one of the files RocksDB uses internally, but is that related 
to managed state used by my functions? Or does that indicate size growth is 
elsewhere?

Regards,
Alexis.

-Original Message-
From: Alexis Sarda-Espinosa  
Sent: Dienstag, 12. April 2022 15:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: RE: RocksDB's state size discrepancy with what's seen with state 
processor API

Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
windowedStream  = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
windowedStream -> process1 -> sink1
windowedStream -> process2 -> sink2
windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan 
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
 wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresponding to newer checkpoints. I would think this makes sense since the 
> configuration specifies

Re: Flink state migration from 1.9 to 1.13

2022-04-14 Thread XU Qinghui
Hello Yu'an,

Thanks for the reply.
I'm using the SQL api, not using the `DataStream` API in the job. So
there's no `keyby` call directly in our code, but we do have some `group
by` and joins in the SQL. (We are using deprecated table planners both
before and after migration)
Do you know what could be the cause of the incompatibility?

BR,



Le jeu. 14 avr. 2022 à 04:20, yu'an huang  a écrit :

> Hi Qinghui,
>
> Did you used a difference keyby() for your KeyedCoProcesserOperator? For
> example, did you use a fied name (keyBy(“id”)) in 1.9 and while use a
> lambda (keyBy(e->e.getId()) in 1.13. This will make the key serializer
> incompatible.
>
> You may reference this link for how to use Apache Flink’s State Processor
> API to modifying savepoints:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>
>
> On 14 Apr 2022, at 1:44 AM, XU Qinghui  wrote:
>
> The new key serializer (org.apache.flink.api.java.typeutils.runtime.
> RowSerializer@896510d) must be compatible with the previous key serializer
>
>
>


Re: Avro deserialization issue

2022-04-14 Thread Dawid Wysakowicz

Hi Anitha,

As far as I can tell the problem is with avro itself. We upgraded avro 
version we use underneath in Flink 1.12.0. In 1.11.x we used avro 1.8.2, 
while starting from 1.12.x we use avro 1.10.0. Maybe that's the problem. 
You could try to upgrading the avro version in your program. Just add 
dependency on avro 1.10. If I remember correctly that should simply work.


If that does not solve the problem, I'd look into which field fails to 
be deserialized.


Best,

Dawid

On 13/04/2022 18:11, Anitha Thankappan wrote:

Hi Piotr,

*The code i wrtten in 1.13.1
*

public final class BigQuerySourceFunction extends
RichSourceFunction implements ResultTypeQueryable {

@Override
public void open(Configuration parameters) throws Exception {

deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
}
..
}

*this is working fine and i got the required output
*

++-++
| op |          id |                           name |
++-++
| +I |           1 |                            ABC |
| +I |           2 |                            XYZ |
++-++
2 rows in set


*same code ith 1.11.0,
*

   RuntimeContextInitializationContextAdapters cannot be resolved

*rewritten the code as
*

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("bigquery"));
}

*but the result was:
*

Caused by: java.io.IOException: Failed to deserialize Avro record.
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at

com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at

org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out
of bounds for length 2
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at

org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at

org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at

org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at

org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski  
wrote:


Hey,

Could you be more specific about how it is not working? A compiler
error that there is no such class as
RuntimeContextInitializationContextAdapters? This class has been
introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this
code and I also don't know where it's documented, but:
a) maybe you should just mimic in reverse the changes done in the
pull request from this issue [2]? `deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("something"))`?
b) RuntimeContextInitializationContextAdapters is `@Internal`
class that is not part of a Public API, so even in 1.13.x you
should be using it. You should probably just implement your
own DeserializationSchema.InitializationContext.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18363
[2] https://github.com/apache/flink/pull/13844/files

pon., 11 kwi 2022 o 15:42 Anitha Thankappan
 napisał(a):


Hi,

I developed a flink connector to read data from bigquery . The
Bigquery read rows are in AVRO format.
I tried it with 1.13.1 its working fine. But my requirement is
1.11.0, in that case the code:

deserializer.open(RuntimeCon