Re: Reading KafkaSource state from a savepoint using the State Processor API

2023-05-24 Thread Hang Ruan
Hi, Charles,

I am used to read the state in the debug mode. I always set the breakpoint
at the return statemnet in `SavepointReader#read`.
Then I could find the state I need in the field `SavepointMetadataV2
savepointMetadata`.
Finally I could deserialize the state bytes with
`KafkaPartitionSplitSerializer`.

Best,
Hang

Charles Tan  于2023年5月24日周三 06:27写道:

> Hi everyone,
>
> I have a few questions about reading KafkaSource state using the State
> Processor API. I have a simple Flink application which reads from a Kafka
> topic then produces to a different topic. After running the Flink job and
> stopping it with a savepoint, I then write a few more records to the input
> topic. When the job is resumed from this savepoint, it reads records from
> the position it left off, indicating that the job successfully used the
> savepoint to recover its position. When I inspect the savepoint file with
> the state processor API, I can read the "SourceReaderState" from the
> savepoint. However, the state is read as a Java byte array and I can't
> decode it or make any sense of it. I want to be able to read the savepoint
> state to find out exactly how much progress (partition/offset) a job has
> made in case it fails or is stopped.
>
> Does anyone have any ideas how I can deserialize the bytes from the Kafka
> source state or more generically how to read the Kafka source operator
> state from a savepoint?
>
> Here is the link to a github repository that contains the Flink job that I
> was running, a savepoint file, and the code I was using to try to read the
> savepoint. (https://github.com/charles-tan/flink-state-processor-example)
>
> Thanks,
> Charles
>


Reading KafkaSource state from a savepoint using the State Processor API

2023-05-23 Thread Charles Tan
Hi everyone,

I have a few questions about reading KafkaSource state using the State
Processor API. I have a simple Flink application which reads from a Kafka
topic then produces to a different topic. After running the Flink job and
stopping it with a savepoint, I then write a few more records to the input
topic. When the job is resumed from this savepoint, it reads records from
the position it left off, indicating that the job successfully used the
savepoint to recover its position. When I inspect the savepoint file with
the state processor API, I can read the "SourceReaderState" from the
savepoint. However, the state is read as a Java byte array and I can't
decode it or make any sense of it. I want to be able to read the savepoint
state to find out exactly how much progress (partition/offset) a job has
made in case it fails or is stopped.

Does anyone have any ideas how I can deserialize the bytes from the Kafka
source state or more generically how to read the Kafka source operator
state from a savepoint?

Here is the link to a github repository that contains the Flink job that I
was running, a savepoint file, and the code I was using to try to read the
savepoint. (https://github.com/charles-tan/flink-state-processor-example)

Thanks,
Charles