Re: Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-18 Thread Hangxiang Yu
Hi, Lei. It's indeed a bit confusing. Could you share the related rocksdb log which may contain more detailed info ? On Fri, Apr 12, 2024 at 12:49 PM Lei Wang wrote: > > I enable RocksDB native metrics and do some performance tuning. > > state.backend.rocksdb.block.cache-size is set to 128m,4

Re: Flink job unable to restore from savepoint

2024-03-27 Thread Hangxiang Yu
Hi, Prashant. Which Flink version did you use? And Did you modify your job logic or configurations ? If yes, Could you share changed things ? On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane wrote: > Hello, > > We have been facing this weird issue of not being able to restore from > savepoint,

Re: Advice on checkpoint interval best practices

2023-12-05 Thread Hangxiang Yu
Hi, Oscar. Just share my thoughts: Benefits of more aggressive checkpoint: 1. less recovery time as you mentioned (which is also related to data flink has to rollback to process) 2. less end-to-end latency for checkpoint-bounded sink in exactly-once mode Costs of more aggressive checkpoint: 1.

Re: Queryable state feature in latest version!!

2023-11-06 Thread Hangxiang Yu
Hi, Puneet. Queryable State has been deprecated in the latest version which will be removed in Flink 2.0. The Interface/Usage is freezed in the 1.x, so you still could reference the documents of previous versions to use it. BTW, Could you also share something about your scenarios using it ? That

Re: Checkpoints are not triggering when S3 is unavailable

2023-11-05 Thread Hangxiang Yu
Hi, Do you mean your checkpoint failure stops the normal running of your job? What's your sink type? If it relies on the completed checkpoint to commit, it should be expected. On Tue, Oct 31, 2023 at 12:03 AM Evgeniy Lyutikov wrote: > Hi team! > I came across strange behavior in Flink 1.17.1.

Re: Clear the State Backends in Flink

2023-11-05 Thread Hangxiang Yu
Hi, Arjun. Do you mean clearing all states stored in a user-defined state ? IIUC, It could be done for Operator state. But it cannot be done for Keyed state for users because every operation for it is binded with a specific key currently. BTW, Could you also share your business scenario ? It could

Re: Cannot find metata file metadats in directory

2023-09-30 Thread Hangxiang Yu
Hi, How did you point out the checkpoint path you restored from ? Seems that you are trying to restore from a not completed or failed checkpoint. On Thu, Sep 28, 2023 at 6:09 PM rui chen wrote: > When we use 1.13.2,we have the following error: > FileNotFoundException: Cannot find metata file

Re: Cannot find metata file metadats in directory

2023-09-30 Thread Hangxiang Yu
Hi, How did you point out the checkpoint path you restored from ? Seems that you are trying to restore from a not completed or failed checkpoint. On Thu, Sep 28, 2023 at 6:09 PM rui chen wrote: > When we use 1.13.2,we have the following error: > FileNotFoundException: Cannot find metata file

Re: Flink Kafka offset commit issues

2023-09-30 Thread Hangxiang Yu
Hi, Elakiya. I think you could check : 1. The TaskManager Log to figure out whether the job is restoring from an existing checkpoint and the restoring checkpoint path. 2. Or you could check the checkpoint ID when you restart your job (If not restoring from a checkpoint, it starts

Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 Thread Hangxiang Yu
Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗 On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang wrote: > Hi, > > 单机本地开发执行,遇到该问题,请问有人遇过并解决吗? > > 2023-09-23 13:52:03.989 INFO > [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval > Join (19/20) >

Re: Re: Re: How to read flinkSQL job state

2023-09-24 Thread Hangxiang Yu
Hi Hangxiang, > > I still have one question about this problem, when using datastream api I > know the key and value type I use in state because I > defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in > flinksql? > > Thanks, > Yifan > > On 2023/

Re: Checkpoint jitter?

2023-09-13 Thread Hangxiang Yu
Hi, Matyas. Do you mean something like adjusting checkpoint intervals dynamically or frequency of uploading files according to the pressure of the durable storage ? On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás wrote: > Hey folks, > > Is it possible to add some sort of jitter to the

Re: Re: How to read flinkSQL job state

2023-09-07 Thread Hangxiang Yu
Hi, Yifan. Which flink version are you using ? You are using filesystem instead of rocksdb so that your checkpoint size may not be incremental IIUC. On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user wrote: > Hi Shammon, > > We are using RocksDB,and the configuration is below: >

Re: Re: How to read flinkSQL job state

2023-09-07 Thread Hangxiang Yu
> increasing. We didn't add any custom checkpoint configuration in flink sql > jobs, where can I see the log of > StreamGraphHasherV2.generateDeterministicHash? And is there a default state > name? > > Thanks, > Yifan > > On 2023/09/06 07:12:05 Hangxiang Yu wrote: >

Re: flink rocksdb在托管&非托管模式下rocksdb内存计算

2023-09-06 Thread Hangxiang Yu
Hi, https://flink-learning.org.cn/article/detail/c1db8bc157c72069979e411cd99714fd 这篇文章中有一些关于 Flink RocksDB write buffer 和 block cache 内存计算的理论和实例讲解,可以参考下 On Fri, Sep 1, 2023 at 2:56 PM crazy <2463829...@qq.com.invalid> wrote: > 大佬们好, >flink1.13.5 >

Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. Unfortunately, The State Processor API only supports Datastream currently. But you still could use it to read your SQL job state. The most difficult thing is that you have to get the operator id which you could get from the log of StreamGraphHasherV2.generateDeterministicHash and state

Re: flink sql作业状态跨存储系统迁移问题

2023-08-02 Thread Hangxiang Yu
Hi, 我理解可以有两种方式: 1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址 2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储 关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid 等信息,以及理解 sql 实际产生的状态才能使用; [1]

Re: Check points are discarded with reason NULL

2023-07-24 Thread Hangxiang Yu
Hi, This exception is thrown because the number of checkpoint exceptions exceeds execution.checkpointing.tolerable-failed-checkpoints, see [1] for more details. There should be other root causes about the checkpoint exception in your JM/TM logs. You could check or share these. [1]

Re: MAx parallelism

2023-07-11 Thread Hangxiang Yu
Hi, If you are using HashMapStateBackend, there may be some noticeable overhead. If RocksDBStateBackend, I think the overhead may be minor. As we know, Flink will write the key group as the prefix of the key to speed up rescaling. So the format will be like: key group | key len | key | ..

Re: the new state serializer can not be incompatible

2023-07-11 Thread Hangxiang Yu
10, 2023 at 12:23 PM 湘晗刚 <1016465...@qq.com> wrote: > Hi , > i am using pojo type , > flink version is 1.10, > but the pojo type is the same as before, > What do.you mean by saying “modify schema of kryo types”? > > ---Original--- > *From:* "Hangxiang Yu&qu

Re: the new state serializer can not be incompatible

2023-07-09 Thread Hangxiang Yu
Hi, Could you share the detailed exception stack ? Or Did you modify any job logic or parameters ? Currently, Flink only supports simple schema evolution (e.g. add or remove fields for pojo types) for DataStream Jobs[1]. Other modifications may cause this exception, for example: 1. modify some

Re: Disable hostname verification in Opensearch Connector

2023-06-24 Thread Hangxiang Yu
Hi, Eugenio. Maybe you could configure RestClientBuilder manually to setSSLHostnameVerifier ? On Fri, Jun 23, 2023 at 4:06 PM Eugenio Marotti < ing.eugenio.maro...@gmail.com> wrote: > Hi, > > I’m currently using the Opensearch Connector for the Table API. For > testing I need to disable the

Re: table.exec.state.ttl not working as expected

2023-06-24 Thread Hangxiang Yu
Hi, neha. Could you share more information: 1. Which State Backend are you using? If it's RocksDB, is incremental checkpointing enabled? 2. Which specific operator is experiencing an increase in Checkpoint data size? (You can check the Checkpoint size changes of different subtasks

Re: Savepoint Failing - Flink 1.16.2 - Scala12

2023-06-24 Thread Hangxiang Yu
Hi, Shrihari. It seems related to https://issues.apache.org/jira/browse/FLINK-28758 which is unresolved now. It should only occur in FlinkKafkaConsumer, So you could migrate to KafkaSource to avoid this issue. On Sat, Jun 24, 2023 at 2:46 AM Shrihari R wrote: > I am trying to stop the job by

Re: RocksDB State Backend GET returns null intermittently

2023-06-24 Thread Hangxiang Yu
Hi, Prabhu. This is a correctness issue. IIUC, It should not be related to the size of the block cache, write buffer, or whether the bloom filter is enabled. Is your job a DataStream job? Does the job contain a custom Serializer? You could check or share the logic of the Serializer, as this is

Re: CleanUpInRocksDbCompactFilter

2023-06-15 Thread Hangxiang Yu
Hi, Patricia. In my opinion, This parameter balances the trade-off between the read/write performance and storage space utilization (of course, smaller state also means better performance for the future). I think the right value of longtimeNumberOfQueries depends on several factors, such as the

Re: 求flink作业各个算子的延迟指标

2023-06-12 Thread Hangxiang Yu
[.[.]]..latency 这个应该可以满足需求?也可以设置不同的粒度。 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io On Mon, Jun 12, 2023 at 5:05 PM casel.chen wrote: > 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗? -- Best, Hangxiang.

Re: Why does ClosureCleaner ignore checkSerialization=false on recursion?

2023-06-11 Thread Hangxiang Yu
Hi, Logan. I guess your program used an inner process function which has closure whose checkSerialization is true (actually, this is a setting of most functions). When a closure object is created in Flink, it may hold references to other non-serializable objects, which may cause serialization

Re: changing serializer affects resuming from checkpoint

2023-06-11 Thread Hangxiang Yu
Backend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) > > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) > > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPar

Re: flink on yarn rocksdb内存超用

2023-06-07 Thread Hangxiang Yu
Hi, 目前对RocksDB使用的内存是没有严格限制住的,可以参考这个 ticket: https://issues.apache.org/jira/browse/FLINK-15532 如果要定位到内存使用情况,可以先看一些粗的Metrics: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics 如果要再细致定位到单 instance 内部 RocksDB 的详细内存使用情况,可能需要用 malloc

Re: changing serializer affects resuming from checkpoint

2023-06-06 Thread Hangxiang Yu
HI, Peng. Do these two jobs have any dependency? Or Could you please share the specific logic of the two jobs if convenient ? Could you also share the failure message of the producer job ? In my opinion, if the two tasks have no other association, as you said, the consumer job will fail due to

Re: Network Buffers

2023-06-06 Thread Hangxiang Yu
> I have a total of 6000 tasks with 16 TM , 4 cores each with > jobmanger/taskmanger.momry.process.size = 8 gb . > > > Thanks & Regards, > Pritam > > > > On Tue, Jun 6, 2023 at 9:02 AM Hangxiang Yu wrote: > >> Hi, Pritam. >> This error message

Re: Network Buffers

2023-06-05 Thread Hangxiang Yu
Hi, Pritam. This error message indicates that the current configuration of the network buffer is not enough to handle the current workload. > What is the meaning of this exception (The total number of network buffers > is currently set to 22773 of 32768 bytes each)? > This just provides some

Re: High Start-Delay And Aligned Checkpointing Causing Timeout.

2023-06-05 Thread Hangxiang Yu
Hi, Pritam. I think the definition works for aligned checkpoint and unaligned checkpoint: "The alignment duration, which is defined as the time between receiving first and the last checkpoint barrier. "

Re: RocksDB segfault on state restore

2023-06-01 Thread Hangxiang Yu
Hi, Gyula. It seems related to https://issues.apache.org/jira/browse/FLINK-23346. We also saw core dump while using list state after triggering state migration and ttl compaction filter. Have you triggered the schema evolution ? It seems a bug of the rocksdb list state together with ttl compaction

Re: Flink checkpoint timeout

2023-06-01 Thread Hangxiang Yu
HI, Ivan. Could you provide more information about it: 1. Which operator subtask is stuck ? or is it random ? 2. Could you share the stack or flame graph of the stuck subtask ? On Wed, May 31, 2023 at 12:45 PM Ethan T Yang wrote: > Hello all, > > We recently start to experience Checkpoint

Re: Re: Flink广播流状态清理策略不生效

2023-05-15 Thread Hangxiang Yu
t; 或者使用广播流的时候有没有什么能够手动清理状态的方法? > > > > > > > > > > > > > > > > > > 在 2023-05-15 11:28:54,"Hangxiang Yu" 写道: > >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 > >< > https://nightlies.apache.org/fli

Re: Flink广播流状态清理策略不生效

2023-05-14 Thread Hangxiang Yu
Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 对 State TTL 的描述; On Mon, May 15, 2023 at 11:05 AM lxk wrote: > flink版本:1.14 >

Re: (无主题)

2023-05-14 Thread Hangxiang Yu
Hi, It's related to FLINK-11695 which has not been resolved until now. You could increase the limit size of hdfs to alleviate this problem. BTW, You could also share or check something before modifying the configuration: >From the logic of your

Re: Flink Job Failure for version 1.16

2023-05-14 Thread Hangxiang Yu
Hi, I may have missed something, So could you share more: I have recently migrated from 1.13.6 to 1.16.1, I can see there is a > performance degradation... Are you referring to a decrease in Checkpoint Performance when you mention performance decline? It just happens when you upgrade from

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hangxiang Yu
Hi, I guess you used a fixed JOB_ID, and configured the same checkpoint dir as before ? And you may also start the job without before state ? The new job cannot know anything about before checkpoints, that's why the new job will fail when it tries to generate a new checkpoint. I'd like to suggest

Re: Failed to initialize delegation token receiver s3

2023-05-09 Thread Hangxiang Yu
Hi,这个应该是FLINK-31839已经确定的ISSUE,在1.17.1中已经修复了,可以参考: https://issues.apache.org/jira/browse/FLINK-31839 On Sat, May 6, 2023 at 5:00 PM maker_d...@foxmail.com < maker_d...@foxmail.com> wrote: > flink version:flink-1.17.0 > k8s application模式模式 > > 已经在flink-conf中禁用delegation token: >

Re: CheckpointedFunction 与 KeyedState

2023-05-05 Thread Hangxiang Yu
Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和 snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里; 原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的; 而 initializeState 和 snapshotState 里是没有框架隐性 set

Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-07 Thread Hangxiang Yu
Hi, Tony. "be detrimental to performance" means that some extra space overhead of the field of the key-group may influence performance. As we know, Flink will write the key group as the prefix of the key to speed up rescaling. So the format will be like: key group | key len | key | .. You

Re: Dear All,关于State processor API,有一些疑惑

2023-01-18 Thread Hangxiang Yu
1. 可以的,state processor api主要就是做这个事儿的 2. 我目前看起来SQL作业似乎没有很好的方式获取uid,一种是打开debug日志,从 StreamGraphHasherV2#generateDeterministicHash的debug日志里可以获取到 On Wed, Jan 18, 2023 at 2:39 PM ying lin wrote: > Flink版本:1.13.6 > 最近在做一些关于Rocksdb State通过state process api 读写的poc,有一些疑惑想请教一下: > > 1. 如果现在已有一个Kafka

Re: flink状态恢复

2023-01-15 Thread Hangxiang Yu
Hi, Flink目前仅支持DataStream作业对POJO类型加减字段,及符合AVRO规则的状态迁移恢复[1]; 如果是其他类型,或者是sql作业,目前社区版本还尚不支持从旧状态恢复,只能无状态启动; [1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution On Thu, Jan 12, 2023

Re: Savepoint a failing job

2022-12-21 Thread Hangxiang Yu
Hi Tim. > Is the only solution to just use the DataStream API? Just as Martijn mentioned, if the execution plan has been changed, it's difficult to reuse the original state to restore. Only if you are dropping some operators, then you could use -- allowNonRestoredState to restore withouting

Re: Parse checkpoint _metadata file

2022-12-21 Thread Hangxiang Yu
Hi, > Is there some way to deserialize the checkpoint _metadata file? You could use some methods like SavepointLoader#loadSavepointMetadata in the State processor api to load it. > If i try to process the file with regular expressions, then approximately 90% of S3 paths of objects are actually

Re: Rocksdb Incremental checkpoint

2022-12-19 Thread Hangxiang Yu
Hi, IIUC, numRetainedCheckpoints will only influence the space overhead of checkpoint dir, but not the incremental size. RocksDB executes incremental checkpoint based on the shard directory which will always remain SST Files as much as possible (maybe it's from the last checkpoint, or maybe from

Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-19 Thread Hangxiang Yu
ing duration? > > Thanks again, > Robin > > Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu a écrit : > >> Hi, Robin. >> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I >> guess your version of Flink was below 1.16 and you adapted the def

Re: Getting S3 client metrics from the flink-s3-fs-presto library

2022-12-18 Thread Hangxiang Yu
Hi, I think it's reasonable to support it. I am thinking to take it one step further: Client-side metrics, or job level metrics for filesystem could help us to monitor filesystem more precisely. Some metrics (like request rate , throughput, latency, retry count, etc) are useful to monitor the

Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-16 Thread Hangxiang Yu
Hi, Robin. >From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I guess your version of Flink was below 1.16 and you adapted the default config of 'state.backend.incremental'. In the version below 1.16, RocksDBStateBackend will use savepoint format as its full snapshot[1]. So it will

Re: How to use the local repositories Jar instead of pulling remote snapshots when building modules?

2022-12-12 Thread Hangxiang Yu
Hi, hjw. I think [1] & [2] may help you. [1] https://stackoverflow.com/questions/16866978/maven-cant-find-my-local-artifacts [2] https://stackoverflow.com/questions/32571400/remote-repositories-prevents-maven-from-resolving-remote-parent On Fri, Dec 2, 2022 at 1:44 AM hjw wrote: > Hi, team. >

Re: Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-12 Thread Hangxiang Yu
Hi Alexis. IIUC, by default, the job id of the new job should be different if you restore from a stopped job ? Whether to cleanup is related to the savepoint restore mode. Just in the case of failover, the job id should not change, and everything in the checkpoint dir will be claimed as you said.

Re: Exceeded Checkpoint tolerable failure

2022-12-11 Thread Hangxiang Yu
ed images for reference. > > > > Regards, > Madan > > On Thursday, 8 December 2022 at 06:29:49 pm GMT-8, Hangxiang Yu < > master...@gmail.com> wrote: > > > Hi, Madan. > I think there is a root cause of the exception, could you share it ? > BTW, If you don't set

Re: Exceeded Checkpoint tolerable failure

2022-12-08 Thread Hangxiang Yu
Hi, Madan. I think there is a root cause of the exception, could you share it ? BTW, If you don't set a value for execution.checkpointing.tolerable-failed-checkpoints, I'd recommend you to set it which could avoid job restart due to some recoverable temporary problems. [1]

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Hangxiang Yu
Hi, Lars. Could you check whether you have configured the lifecycle of google cloud storage[1] which is not recommended in the flink checkpoint usage? [1] https://cloud.google.com/storage/docs/lifecycle On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven wrote: > Hello, > We had an incident today

Re: Savepoint restore mode for the Kubernetes operator

2022-11-27 Thread Hangxiang Yu
Hi, Alexis. IIUC, There is no conflict between savepoint history and restore mode. Restore mode cares about whether/how we manage the savepoint of old job. Savepoint management in operator only cares about savepoint history of new job. In other words, savepoint cleanup should not clean the

Re: Safe way to clear old checkpoint data

2022-11-27 Thread Hangxiang Yu
Hi, As Martijn mentioned, snapshot ownership in 1.15 is the best way. You say there are just 24000/10 references in a shared directory in a job. Is your case in the scope of [1] ? If right, I think it works if you could check the _metadata and find some files not referenced. And I suggest

Re: Flink falls back on to kryo serializer for GenericTypes

2022-10-11 Thread Hangxiang Yu
Hi Sucheth. It's related to how you defined your GenericTypes. You may still need to give some hints to flink if you are using complicated generic types so what you tried may not be enough. Could you share your generic type object ? BTW, Maybe you could refer to [1] which I think it's similar to

Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 Thread Hangxiang Yu
是什么值下跌呢?哪个metric吗? On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东 wrote: > Hi: > 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗? -- Best, Hangxiang.

Re: Window state size with global window and custom trigger

2022-10-09 Thread Hangxiang Yu
Hi, Alexis. I think you are right. It also applies for a global window with a custom trigger. If you apply a ReduceFunction or AggregateFunction, the window state size usually is smaller than applying ProcessWindowFunction due to the aggregated value. It also works for global windows. Of course,

Re: Difference between Checkpoint and Savepoint

2022-09-25 Thread Hangxiang Yu
Hi, > Regarding externalized checkpoint, is the checkpoint written to persistent storage only if the job is failed or suspended? What about cancelled or killed by the user? The checkpoint will be retained on cancellation and failure if you configure RETAIN_ON_CANCELLATION. > What information is

Re: How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Hangxiang Yu
Hi, I think maybe you could try to create a Function that implements WithMasterCheckpointHook. These hooks will be called by the checkpoint coordinator when triggering / restoring a checkpoint. You could see more details from [1]. [1]

Re: Flink upgrade path

2022-09-06 Thread Hangxiang Yu
Hi, Alexey. You could check the state compatibility in the compatibility table. The page includes how to upgrade and whether it is compatible among different versions. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/upgrading/#compatibility-table On Wed, Sep 7, 2022 at

Re: Out of memory in heap memory when working with state

2022-09-06 Thread Hangxiang Yu
Hi, lan. I guess you are using the old version of flink. You could use RocksDBStateBackend[1] in the new version. It will put the state into disk when the state is large which could avoid using too much memory. BTW, In the current internal mechanism, the state on the external storage like s3 is

Re: 基于savepoint重启作业无法保证端到端一致性

2022-08-26 Thread Hangxiang Yu
flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; 你用的kafka是哪个版本? On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: > 各位好! > 目前有一flink作业,source与sink均为kafka。 > 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 > 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 > >

Re: 关于Flink state初始化的问题

2022-08-25 Thread Hangxiang Yu
open确实是初始化的时候就会调用的; 第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的; 这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法; On Fri, Aug 26, 2022 at 10:25 AM 曲洋 wrote: > 各位好, > > >

Re: flink自动重启出错

2022-08-23 Thread Hangxiang Yu
是DS作业吗?可以share下使用state的部分吗? On Sat, Aug 20, 2022 at 3:35 PM Jason_H wrote: > 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > 回复的原邮件 > | 发件人 | Michael Ran | > | 发送日期 | 2022年8月20日 15:31 | > | 收件人 | tsreape...@gmail.com | > | 主题 | 回复:flink自动重启出错

Re: Doubt about the RUNNING state of the job

2022-08-23 Thread Hangxiang Yu
Actually, Every operator/subtask will have its own status just behind the 'RUNNING' status on the flink UI. The status may be 'CREATED', 'DEPLOYING', 'INITING', 'RUNNING' and so on as you could see some status in [1]. Different operators/subtasks may have different status. After all

Re: Pojo state schema evolution not working correctly

2022-08-07 Thread Hangxiang Yu
Hi, IIUC, Conditions to reproduce it are: 1. Using RocksDBStateBackend with incremental strategy 2. Using ListState in the stateful operator 3. enabling TTL with cleanupInRocksdbCompactFilter 4. adding a field to make the job trigger schema evolution Then the exception will be thrown, right? As

Re: Flink作业修改时State的兼容性

2022-07-31 Thread Hangxiang Yu
hello,目前开源版本仅支持DataStream作业有限的更改,如DS作业加减字段等[1]; 阿里云的Flink版本支持了SQL作业修改后的兼容性判断、大部分SQL算子修改的兼容性、比社区更快的状态迁移速度(几乎不阻塞作业启动); Best, Hangxiang. [1]

Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 Thread Hangxiang Yu
你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。 关于合并到社区,短期内没有相关计划。 [1] https://help.aliyun.com/document_detail/421043.html [2] https://help.aliyun.com/document_detail/414255.html [3] https://help.aliyun.com/document_detail/414256.html Best, Hangxiang. On Wed, Jul

Re: Can FIFO compaction with RocksDB result in data loss?

2022-07-04 Thread Hangxiang Yu
Hi, Vishal. IIUC, 1. FIFO compaction drops the old data by the configured size in L0, so the old data may be dropped but we could not know. That's why "it's basically a TTL compaction style and It is suited for keeping event log data with very low overhead (query log for example)". If it's the

Re: how to connect to the flink-state store and use it as cache to serve APIs.

2022-06-28 Thread Hangxiang Yu
Hi, laxmi. There are two ways that users can access the state store currently: 1. Queryable state [1] which you could access states in runtime. 2. State Processor API [2] which you could access states (snapshot) offline. But we have marked the Queryable state as "Reaching End-of-Life". We are

Re: Spike in checkpoint start delay every 15 minutes

2022-06-17 Thread Hangxiang Yu
Is the 4th "checkpointed size" and "checkpoint duration" bigger than others? If it is true, I guess it's related to the flush of rocksdb. It may delay the next checkpoint. Best, Hangxiang. On Fri, Jun 17, 2022 at 2:31 PM Hangxiang Yu wrote: > Is the 4th "checkp

Re: Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Hangxiang Yu
Hi, Jai. Could you share your configuration about the checkpoint (interval, min-pause, and so on) and the checkpoint details in the Flink UI ? I guess the delay of the checkpoint may be related to the last checkpoint completion time as you could see in the

Re: Re: Some question with Flink state

2022-05-23 Thread Hangxiang Yu
处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: H

Re: TolerableCheckpointFailureNumber not always applying

2022-05-23 Thread Hangxiang Yu
e.co > > > > On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu wrote: > >> Hi, Gaël Renoux. >> As you could see in [1], There are some descriptions about the config: >> "This only applies to the following failure reasons: IOException on the >> Job Mana

Re: Some question with Flink state

2022-05-23 Thread Hangxiang Yu
Hello, All states will not be shared in different parallelisms. BTW, English questions could be sent to u...@flink.apache.org. Best, Hangxiang. On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > Hi everyone >I was used Flink keyed-state in my Project.But I found some questions >

Re: TolerableCheckpointFailureNumber not always applying

2022-05-23 Thread Hangxiang Yu
Hi, Gaël Renoux. As you could see in [1], There are some descriptions about the config: "This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the

Re: Schema Evolution of POJOs fails on Field Removal

2022-05-18 Thread Hangxiang Yu
Hi, David. Removing a field from a POJO should work as you said. But I think we need more information. What version of flink are you using? Do you have any other modifications? Could you share your code segments and the error jm log if convenient ? On Wed, May 18, 2022 at 9:07 PM David Jost

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Hangxiang Yu
Hi, James. I may not get what the problem is. All checkpoints will store in the address as you set. IIUC, TMs will write some checkpoint info in their local dir and then upload them to the address and then delete local one. JM will write some metas of checkpoint to the address and also do the

Re: Random incorrect checkpoint existence check

2022-04-25 Thread Hangxiang Yu
Hi, Chen-che, I think it may be similar to FLINK-12381 . You could adopt the suggestion like setting the job-id as you could see the comment below the ticket. I think you could also share your environment in this ticket to let us know more

Re: web ui中能查看到job失败的原因吗?

2022-04-22 Thread Hangxiang Yu
JobManager的log里应该能看到root cause? On Thu, Apr 21, 2022 at 5:54 PM weishishuo...@163.com wrote: > > 我提交一个postgresql cdc 同步数据到 mysql jdbc sink的job,过了一会儿就失败了,点击job的链接,web > ui界面的状态是FAILED,但是异常信息不明确 > ``` > 2022-04-21 17:30:50 > org.apache.flink.runtime.JobException: Recovery is suppressed by >

Re: Savepoint and cancel questions

2022-04-22 Thread Hangxiang Yu
Hi, Dan 1. Do you mean put the option into savepoint command? If so, I think it will not work well. This option describe that how checkpoints will be cleaned up in different job status. e.g. FAILED/CANCELED. It cannot be covered in savepoint command. 2. Which flink version you use? I work on 1.14