Re: Re: Re: How to read flinkSQL job state
Hi, Yifan. Unfortunately, IIUC, we could get the key and value type only by reading related sql codes currently. I think it's useful if we could support SQL semantics for the Processor API, but it indeed will take lots of effort. On Thu, Sep 21, 2023 at 12:05 PM Yifan He via user wrote: > Hi Hangxiang, > > I still have one question about this problem, when using datastream api I > know the key and value type I use in state because I > defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in > flinksql? > > Thanks, > Yifan > > On 2023/09/07 06:16:41 Hangxiang Yu wrote: > > Hi, Yifan. > > Which flink version are you using ? > > You are using filesystem instead of rocksdb so that your checkpoint size > > may not be incremental IIUC. > > > > On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user < > us...@flink.apache.org> > > wrote: > > > > > Hi Shammon, > > > > > > We are using RocksDB,and the configuration is below: > > > execution.checkpointing.externalized-checkpoint-retention: > > > RETAIN_ON_CANCELLATION > > > execution.checkpointing.max-concurrent-checkpoints: 1 > > > execution.checkpointing.min-pause: 0 > > > execution.checkpointing.mode: EXACTLY_ONCE > > > execution.checkpointing.snapshot-compression: true > > > execution.checkpointing.timeout: 6 > > > state.backend: FILESYSTEM > > > state.backend.incremental: true > > > state.backend.local-recovery: true > > > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 > > > state.backend.rocksdb.memory.managed: true > > > state.backend.rocksdb.memory.write-buffer-ratio: 0.5 > > > state.backend.rocksdb.predefined-options: DEFAULT > > > state.backend.rocksdb.timer-service.factory: ROCKSDB > > > state.checkpoints.num-retained: 3 > > > > > > Thanks, > > > Yifan > > > > > > On 2023/09/06 08:00:31 Shammon FY wrote: > > > > Hi Yifan, > > > > > > > > Besides reading job state, I would like to know what statebackend > are you > > > > using? Can you give the configurations about state and checkpoint for > > > your > > > > job? Maybe you can check these configuration items to confirm if > they are > > > > correct first. > > > > > > > > Best, > > > > Shammon FY > > > > > > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > > > > > > > > > Hi, Yifan. > > > > > > > > > > I think the document[1] means to let us convert the DataStream to > the > > > > > Table[2]. Then we could handle the state with the Table API & SQL. > > > > > > > > > > Best, > > > > > Hang > > > > > > > > > > [1] > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > > > > > [2] > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > > > > > > > > > Yifan He via user 于2023年9月6日周三 13:10写道: > > > > > > > > > >> Hi team, > > > > >> > > > > >> We are investigating why the checkpoint size of our FlinkSQL jobs > > > keeps > > > > >> growing and we want to look into the checkpoint file to know what > is > > > > >> causing the problem. I know we can use the state processor api to > > > read the > > > > >> state of jobs using datastream api, but how can I read the state > of > > > jobs > > > > >> using table api & sql? > > > > >> > > > > >> Thanks, > > > > >> Yifan > > > > >> > > > > > > > > > > > > > > > > > > -- > > Best, > > Hangxiang. > > > -- Best, Hangxiang.
RE: Re: Re: How to read flinkSQL job state
Hi Hangxiang, I still have one question about this problem, when using datastream api I know the key and value type I use in state because I defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in flinksql? Thanks, Yifan On 2023/09/07 06:16:41 Hangxiang Yu wrote: > Hi, Yifan. > Which flink version are you using ? > You are using filesystem instead of rocksdb so that your checkpoint size > may not be incremental IIUC. > > On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user > wrote: > > > Hi Shammon, > > > > We are using RocksDB,and the configuration is below: > > execution.checkpointing.externalized-checkpoint-retention: > > RETAIN_ON_CANCELLATION > > execution.checkpointing.max-concurrent-checkpoints: 1 > > execution.checkpointing.min-pause: 0 > > execution.checkpointing.mode: EXACTLY_ONCE > > execution.checkpointing.snapshot-compression: true > > execution.checkpointing.timeout: 6 > > state.backend: FILESYSTEM > > state.backend.incremental: true > > state.backend.local-recovery: true > > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 > > state.backend.rocksdb.memory.managed: true > > state.backend.rocksdb.memory.write-buffer-ratio: 0.5 > > state.backend.rocksdb.predefined-options: DEFAULT > > state.backend.rocksdb.timer-service.factory: ROCKSDB > > state.checkpoints.num-retained: 3 > > > > Thanks, > > Yifan > > > > On 2023/09/06 08:00:31 Shammon FY wrote: > > > Hi Yifan, > > > > > > Besides reading job state, I would like to know what statebackend are you > > > using? Can you give the configurations about state and checkpoint for > > your > > > job? Maybe you can check these configuration items to confirm if they are > > > correct first. > > > > > > Best, > > > Shammon FY > > > > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > > > > > > > Hi, Yifan. > > > > > > > > I think the document[1] means to let us convert the DataStream to the > > > > Table[2]. Then we could handle the state with the Table API & SQL. > > > > > > > > Best, > > > > Hang > > > > > > > > [1] > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > > > > [2] > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > > > > > > > Yifan He via user 于2023年9月6日周三 13:10写道: > > > > > > > >> Hi team, > > > >> > > > >> We are investigating why the checkpoint size of our FlinkSQL jobs > > keeps > > > >> growing and we want to look into the checkpoint file to know what is > > > >> causing the problem. I know we can use the state processor api to > > read the > > > >> state of jobs using datastream api, but how can I read the state of > > jobs > > > >> using table api & sql? > > > >> > > > >> Thanks, > > > >> Yifan > > > >> > > > > > > > > > > > > -- > Best, > Hangxiang. >
RE: Re: Re: How to read flinkSQL job state
Hi Hangxiang, We are using flink 1.14, the state backend is EmbeddedRocksDBStateBackend , and the Checkpoint Storage is filesystem. This is the checkpoint configuration from our running jobs Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend EmbeddedRocksDBStateBackend Interval 10m 0s Timeout 20m 0s Minimum Pause Between Checkpoints 3m 0s Maximum Concurrent Checkpoints 1 Unaligned Checkpoints Enabled Aligned checkpoint timeout 0ms Persist Checkpoints Externally Enabled (retain on cancellation) Tolerable Failed Checkpoints 5 Checkpoints With Finished Tasks Disabled Thanks, Yifan On 2023/09/07 06:16:41 Hangxiang Yu wrote: > Hi, Yifan. > Which flink version are you using ? > You are using filesystem instead of rocksdb so that your checkpoint size > may not be incremental IIUC. > > On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user > wrote: > > > Hi Shammon, > > > > We are using RocksDB,and the configuration is below: > > execution.checkpointing.externalized-checkpoint-retention: > > RETAIN_ON_CANCELLATION > > execution.checkpointing.max-concurrent-checkpoints: 1 > > execution.checkpointing.min-pause: 0 > > execution.checkpointing.mode: EXACTLY_ONCE > > execution.checkpointing.snapshot-compression: true > > execution.checkpointing.timeout: 6 > > state.backend: FILESYSTEM > > state.backend.incremental: true > > state.backend.local-recovery: true > > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 > > state.backend.rocksdb.memory.managed: true > > state.backend.rocksdb.memory.write-buffer-ratio: 0.5 > > state.backend.rocksdb.predefined-options: DEFAULT > > state.backend.rocksdb.timer-service.factory: ROCKSDB > > state.checkpoints.num-retained: 3 > > > > Thanks, > > Yifan > > > > On 2023/09/06 08:00:31 Shammon FY wrote: > > > Hi Yifan, > > > > > > Besides reading job state, I would like to know what statebackend are you > > > using? Can you give the configurations about state and checkpoint for > > your > > > job? Maybe you can check these configuration items to confirm if they are > > > correct first. > > > > > > Best, > > > Shammon FY > > > > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > > > > > > > Hi, Yifan. > > > > > > > > I think the document[1] means to let us convert the DataStream to the > > > > Table[2]. Then we could handle the state with the Table API & SQL. > > > > > > > > Best, > > > > Hang > > > > > > > > [1] > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > > > > [2] > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > > > > > > > Yifan He via user 于2023年9月6日周三 13:10写道: > > > > > > > >> Hi team, > > > >> > > > >> We are investigating why the checkpoint size of our FlinkSQL jobs > > keeps > > > >> growing and we want to look into the checkpoint file to know what is > > > >> causing the problem. I know we can use the state processor api to > > read the > > > >> state of jobs using datastream api, but how can I read the state of > > jobs > > > >> using table api & sql? > > > >> > > > >> Thanks, > > > >> Yifan > > > >> > > > > > > > > > > > > -- > Best, > Hangxiang. >
Re: Re: How to read flinkSQL job state
Hi, Yifan. Which flink version are you using ? You are using filesystem instead of rocksdb so that your checkpoint size may not be incremental IIUC. On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user wrote: > Hi Shammon, > > We are using RocksDB,and the configuration is below: > execution.checkpointing.externalized-checkpoint-retention: > RETAIN_ON_CANCELLATION > execution.checkpointing.max-concurrent-checkpoints: 1 > execution.checkpointing.min-pause: 0 > execution.checkpointing.mode: EXACTLY_ONCE > execution.checkpointing.snapshot-compression: true > execution.checkpointing.timeout: 6 > state.backend: FILESYSTEM > state.backend.incremental: true > state.backend.local-recovery: true > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 > state.backend.rocksdb.memory.managed: true > state.backend.rocksdb.memory.write-buffer-ratio: 0.5 > state.backend.rocksdb.predefined-options: DEFAULT > state.backend.rocksdb.timer-service.factory: ROCKSDB > state.checkpoints.num-retained: 3 > > Thanks, > Yifan > > On 2023/09/06 08:00:31 Shammon FY wrote: > > Hi Yifan, > > > > Besides reading job state, I would like to know what statebackend are you > > using? Can you give the configurations about state and checkpoint for > your > > job? Maybe you can check these configuration items to confirm if they are > > correct first. > > > > Best, > > Shammon FY > > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > > > > > Hi, Yifan. > > > > > > I think the document[1] means to let us convert the DataStream to the > > > Table[2]. Then we could handle the state with the Table API & SQL. > > > > > > Best, > > > Hang > > > > > > [1] > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > > > [2] > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > > > > > Yifan He via user 于2023年9月6日周三 13:10写道: > > > > > >> Hi team, > > >> > > >> We are investigating why the checkpoint size of our FlinkSQL jobs > keeps > > >> growing and we want to look into the checkpoint file to know what is > > >> causing the problem. I know we can use the state processor api to > read the > > >> state of jobs using datastream api, but how can I read the state of > jobs > > >> using table api & sql? > > >> > > >> Thanks, > > >> Yifan > > >> > > > > > > -- Best, Hangxiang.
Re: Re: How to read flinkSQL job state
Hi, Yifan. If you enable the debug level log, you could see the log like 'Generated hash xxx for node xxx'. I haven't found other ways to find the operator id of SQL jobs, maybe I missed something, or we should export this info more directly. Unfortunately, there is no default state name for an operator, we have to check the details of operator code currently. On Thu, Sep 7, 2023 at 10:56 AM Yifan He via user wrote: > Hi Hangxiang, > > Thanks for your answer! We are using RocksDB state backend, and the > incremental checkpoint is enabled, and it is the incremental size keeps > increasing. We didn't add any custom checkpoint configuration in flink sql > jobs, where can I see the log of > StreamGraphHasherV2.generateDeterministicHash? And is there a default state > name? > > Thanks, > Yifan > > On 2023/09/06 07:12:05 Hangxiang Yu wrote: > > Hi, Yifan. > > Unfortunately, The State Processor API only supports Datastream > currently. > > But you still could use it to read your SQL job state. > > The most difficult thing is that you have to get the operator id which > you > > could get from the log of StreamGraphHasherV2.generateDeterministicHash > and > > state name which you could get from the code of operator. > > > > BTW, About investigating why the checkpoint size keeps growing: > > 1. Which State Backend are you using ? > > 2. Are you enabling incremental checkpoint ? The checkpoint size you > > mentioned is incremental size or full size ? > > 3. If full size, Did you evaluate whether the size is matching the > > theoretical size ? > > > > > > On Wed, Sep 6, 2023 at 1:11 PM Yifan He via user > > > wrote: > > > > > Hi team, > > > > > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > > > growing and we want to look into the checkpoint file to know what is > > > causing the problem. I know we can use the state processor api to read > the > > > state of jobs using datastream api, but how can I read the state of > jobs > > > using table api & sql? > > > > > > Thanks, > > > Yifan > > > > > > > > > -- > > Best, > > Hangxiang. > > > -- Best, Hangxiang.
RE: Re: How to read flinkSQL job state
Hi Hangxiang, Thanks for your answer! We are using RocksDB state backend, and the incremental checkpoint is enabled, and it is the incremental size keeps increasing. We didn't add any custom checkpoint configuration in flink sql jobs, where can I see the log of StreamGraphHasherV2.generateDeterministicHash? And is there a default state name? Thanks, Yifan On 2023/09/06 07:12:05 Hangxiang Yu wrote: > Hi, Yifan. > Unfortunately, The State Processor API only supports Datastream currently. > But you still could use it to read your SQL job state. > The most difficult thing is that you have to get the operator id which you > could get from the log of StreamGraphHasherV2.generateDeterministicHash and > state name which you could get from the code of operator. > > BTW, About investigating why the checkpoint size keeps growing: > 1. Which State Backend are you using ? > 2. Are you enabling incremental checkpoint ? The checkpoint size you > mentioned is incremental size or full size ? > 3. If full size, Did you evaluate whether the size is matching the > theoretical size ? > > > On Wed, Sep 6, 2023 at 1:11 PM Yifan He via user > wrote: > > > Hi team, > > > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > > growing and we want to look into the checkpoint file to know what is > > causing the problem. I know we can use the state processor api to read the > > state of jobs using datastream api, but how can I read the state of jobs > > using table api & sql? > > > > Thanks, > > Yifan > > > > > -- > Best, > Hangxiang. >
RE: Re: How to read flinkSQL job state
Hi Shammon, We are using RocksDB,and the configuration is below: execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.min-pause: 0 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.snapshot-compression: true execution.checkpointing.timeout: 6 state.backend: FILESYSTEM state.backend.incremental: true state.backend.local-recovery: true state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 state.backend.rocksdb.memory.managed: true state.backend.rocksdb.memory.write-buffer-ratio: 0.5 state.backend.rocksdb.predefined-options: DEFAULT state.backend.rocksdb.timer-service.factory: ROCKSDB state.checkpoints.num-retained: 3 Thanks, Yifan On 2023/09/06 08:00:31 Shammon FY wrote: > Hi Yifan, > > Besides reading job state, I would like to know what statebackend are you > using? Can you give the configurations about state and checkpoint for your > job? Maybe you can check these configuration items to confirm if they are > correct first. > > Best, > Shammon FY > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > > > Hi, Yifan. > > > > I think the document[1] means to let us convert the DataStream to the > > Table[2]. Then we could handle the state with the Table API & SQL. > > > > Best, > > Hang > > > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > > [2] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > > > Yifan He via user 于2023年9月6日周三 13:10写道: > > > >> Hi team, > >> > >> We are investigating why the checkpoint size of our FlinkSQL jobs keeps > >> growing and we want to look into the checkpoint file to know what is > >> causing the problem. I know we can use the state processor api to read the > >> state of jobs using datastream api, but how can I read the state of jobs > >> using table api & sql? > >> > >> Thanks, > >> Yifan > >> > > >
Re: How to read flinkSQL job state
Hi, Yifan Flink SQL & Table API currently doesn't support reading the state directly. Best, Ron Yifan He via user 于2023年9月6日周三 13:11写道: > Hi team, > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > growing and we want to look into the checkpoint file to know what is > causing the problem. I know we can use the state processor api to read the > state of jobs using datastream api, but how can I read the state of jobs > using table api & sql? > > Thanks, > Yifan >
Re: How to read flinkSQL job state
Hi Yifan, Besides reading job state, I would like to know what statebackend are you using? Can you give the configurations about state and checkpoint for your job? Maybe you can check these configuration items to confirm if they are correct first. Best, Shammon FY On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan wrote: > Hi, Yifan. > > I think the document[1] means to let us convert the DataStream to the > Table[2]. Then we could handle the state with the Table API & SQL. > > Best, > Hang > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table > > Yifan He via user 于2023年9月6日周三 13:10写道: > >> Hi team, >> >> We are investigating why the checkpoint size of our FlinkSQL jobs keeps >> growing and we want to look into the checkpoint file to know what is >> causing the problem. I know we can use the state processor api to read the >> state of jobs using datastream api, but how can I read the state of jobs >> using table api & sql? >> >> Thanks, >> Yifan >> >
Re: How to read flinkSQL job state
Hi, Yifan. I think the document[1] means to let us convert the DataStream to the Table[2]. Then we could handle the state with the Table API & SQL. Best, Hang [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table Yifan He via user 于2023年9月6日周三 13:10写道: > Hi team, > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > growing and we want to look into the checkpoint file to know what is > causing the problem. I know we can use the state processor api to read the > state of jobs using datastream api, but how can I read the state of jobs > using table api & sql? > > Thanks, > Yifan >
Re: How to read flinkSQL job state
Hi, Yifan. Unfortunately, The State Processor API only supports Datastream currently. But you still could use it to read your SQL job state. The most difficult thing is that you have to get the operator id which you could get from the log of StreamGraphHasherV2.generateDeterministicHash and state name which you could get from the code of operator. BTW, About investigating why the checkpoint size keeps growing: 1. Which State Backend are you using ? 2. Are you enabling incremental checkpoint ? The checkpoint size you mentioned is incremental size or full size ? 3. If full size, Did you evaluate whether the size is matching the theoretical size ? On Wed, Sep 6, 2023 at 1:11 PM Yifan He via user wrote: > Hi team, > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > growing and we want to look into the checkpoint file to know what is > causing the problem. I know we can use the state processor api to read the > state of jobs using datastream api, but how can I read the state of jobs > using table api & sql? > > Thanks, > Yifan > -- Best, Hangxiang.
Re: How to read flinkSQL job state
Hi Yifan, AFAIK, if you want to query a job’s state from outside Flink, you can use Queryable State[1]. Hope this helps. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/ Xiangyu Yifan He via user 于2023年9月6日周三 13:10写道: > Hi team, > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > growing and we want to look into the checkpoint file to know what is > causing the problem. I know we can use the state processor api to read the > state of jobs using datastream api, but how can I read the state of jobs > using table api & sql? > > Thanks, > Yifan >
How to read flinkSQL job state
Hi team, We are investigating why the checkpoint size of our FlinkSQL jobs keeps growing and we want to look into the checkpoint file to know what is causing the problem. I know we can use the state processor api to read the state of jobs using datastream api, but how can I read the state of jobs using table api & sql? Thanks, Yifan