Re: Why RocksDB metrics cache-usage is larger than cache-capacity
Sorry, it was probably an observation mistake. I export the metrics to Prometheus and query the result on grafana, actually the usage will not exceed the capacity Thanks, Lei On Fri, Apr 19, 2024 at 9:55 AM Hangxiang Yu wrote: > Hi, Lei. > It's indeed a bit confusing. Could you share the related rocksdb log which > may contain more detailed info ? > > On Fri, Apr 12, 2024 at 12:49 PM Lei Wang wrote: > >> >> I enable RocksDB native metrics and do some performance tuning. >> >> state.backend.rocksdb.block.cache-size is set to 128m,4 slots for each >> TaskManager. >> >> The observed result for one specific parallel slot: >> state.backend.rocksdb.metrics.block-cache-capacity is about 14.5M >> state.backend.rocksdb.metrics.block-cache-usage is about 24M >> >> I am a little confused, why usage is larger than capacity? >> >> Thanks, >> Lei >> > > > -- > Best, > Hangxiang. >
Why RocksDB metrics cache-usage is larger than cache-capacity
I enable RocksDB native metrics and do some performance tuning. state.backend.rocksdb.block.cache-size is set to 128m,4 slots for each TaskManager. The observed result for one specific parallel slot: state.backend.rocksdb.metrics.block-cache-capacity is about 14.5M state.backend.rocksdb.metrics.block-cache-usage is about 24M I am a little confused, why usage is larger than capacity? Thanks, Lei
Re: How to enable RocksDB native metrics?
Thanks very much, it finally works On Thu, Apr 11, 2024 at 8:27 PM Zhanghao Chen wrote: > Add a space between -yD and the param should do the trick. > > Best, > Zhanghao Chen > ------ > *From:* Lei Wang > *Sent:* Thursday, April 11, 2024 19:40 > *To:* Zhanghao Chen > *Cc:* Biao Geng ; user > *Subject:* Re: How to enable RocksDB native metrics? > > Hi Zhanghao, > > flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G > -yqu default -p 8 -yDstate.backend.latency-track.keyed-state-enabled=true -c > com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic > dwd_audio_record --groupId clean_wl_ --sourceServers x.x.x.x:9092 > > Tried, it doesn't work, the error is: > Could not get job jar and dependencies from JAR file: JAR file does not > exist: -yDstate.backend.latency-track.keyed-state-enabled=true > > Thanks, > Lei > > On Thu, Apr 11, 2024 at 5:19 PM Zhanghao Chen > wrote: > > Hi Lei, > > You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D" > should be used. > -- > *From:* Lei Wang > *Sent:* Thursday, April 11, 2024 12:39 > *To:* Biao Geng > *Cc:* user > *Subject:* Re: How to enable RocksDB native metrics? > > Hi Biao, > > I tried, it doesn't work. The cmd is: > > flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G > -yqu default -p 8 -Dstate.backend.latency-track.keyed-state-enabled=true > -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar > --sourceTopic dwd_audio_record --groupId clean_wl_ --sourceServers > x.x.x.x:9092 > > Seems the -D param is ignored. Even i change the param to a wrong > spelling, the job is submitted successfully. > Any suggestions on this? > > Thanks, > Lei > > On Mon, Apr 8, 2024 at 9:48 AM Biao Geng wrote: > > Hi Lei, > You can use the "-D" option in the command line to set configs for a > specific job. E.g, `flink run-application -t > yarn-application -Djobmanager.memory.process.size=1024m `. > See > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ > for more details. > > Best, > Biao Geng > > Marco Villalobos 于2024年4月8日周一 09:22写道: > > Hi Lei, > > Have you tried enabling these Flink configuration properties? > > Configuration > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > nightlies.apache.org > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > [image: favicon.png] > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > > Sent from my iPhone > > On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: > > > I want to enable it only for specified jobs, how can I specify the > configurations on cmd line when submitting a job? > > Thanks, > Lei > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > > Hi Lei, > > You can enable it by some configurations listed in: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics > (RocksDB Native Metrics) > > > Best, > Zakelly > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > > Hi Lei, > > You can enable it by some configurations listed in: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics > (RocksDB Native Metrics) > > > Best, > Zakelly > > On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: > > > Using big state and want to do some performance tuning, how can i enable > RocksDB native metrics? > > I am using Flink 1.14.4 > > Thanks, > Lei > >
Re: How to enable RocksDB native metrics?
Hi Zhanghao, flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu default -p 8 -yDstate.backend.latency-track.keyed-state-enabled=true -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic dwd_audio_record --groupId clean_wl_ --sourceServers x.x.x.x:9092 Tried, it doesn't work, the error is: Could not get job jar and dependencies from JAR file: JAR file does not exist: -yDstate.backend.latency-track.keyed-state-enabled=true Thanks, Lei On Thu, Apr 11, 2024 at 5:19 PM Zhanghao Chen wrote: > Hi Lei, > > You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D" > should be used. > ------ > *From:* Lei Wang > *Sent:* Thursday, April 11, 2024 12:39 > *To:* Biao Geng > *Cc:* user > *Subject:* Re: How to enable RocksDB native metrics? > > Hi Biao, > > I tried, it doesn't work. The cmd is: > > flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G > -yqu default -p 8 -Dstate.backend.latency-track.keyed-state-enabled=true > -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar > --sourceTopic dwd_audio_record --groupId clean_wl_ --sourceServers > x.x.x.x:9092 > > Seems the -D param is ignored. Even i change the param to a wrong > spelling, the job is submitted successfully. > Any suggestions on this? > > Thanks, > Lei > > On Mon, Apr 8, 2024 at 9:48 AM Biao Geng wrote: > > Hi Lei, > You can use the "-D" option in the command line to set configs for a > specific job. E.g, `flink run-application -t > yarn-application -Djobmanager.memory.process.size=1024m `. > See > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ > for more details. > > Best, > Biao Geng > > Marco Villalobos 于2024年4月8日周一 09:22写道: > > Hi Lei, > > Have you tried enabling these Flink configuration properties? > > Configuration > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > nightlies.apache.org > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > [image: favicon.png] > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > > Sent from my iPhone > > On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: > > > I want to enable it only for specified jobs, how can I specify the > configurations on cmd line when submitting a job? > > Thanks, > Lei > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > > Hi Lei, > > You can enable it by some configurations listed in: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics > (RocksDB Native Metrics) > > > Best, > Zakelly > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > > Hi Lei, > > You can enable it by some configurations listed in: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics > (RocksDB Native Metrics) > > > Best, > Zakelly > > On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: > > > Using big state and want to do some performance tuning, how can i enable > RocksDB native metrics? > > I am using Flink 1.14.4 > > Thanks, > Lei > >
Re: Optimize exact deduplication for tens of billions data per day
Hi Peter, I tried,this improved performance significantly,but i don't know exactly why. According to what i know, the number of keys in RocksDB doesn't decrease. Any specific technical material about this? Thanks, Lei On Fri, Mar 29, 2024 at 9:49 PM Lei Wang wrote: > Perhaps I can keyBy(Hash(originalKey) % 10) > Then in the KeyProcessOperator using MapState instead of ValueState > MapState mapState > > There's about 10 OriginalKey for each mapState > > Hope this will help > > On Fri, Mar 29, 2024 at 9:24 PM Péter Váry > wrote: > >> Hi Lei, >> >> Have you tried to make the key smaller, and store a list of found keys as >> a value? >> >> Let's make the operator key a hash of your original key, and store a list >> of the full keys in the state. You can play with your hash length to >> achieve the optimal number of keys. >> >> I hope this helps, >> Peter >> >> On Fri, Mar 29, 2024, 09:08 Lei Wang wrote: >> >>> >>> Use RocksDBBackend to store whether the element appeared within the last >>> one day, here is the code: >>> >>> *public class DedupFunction extends KeyedProcessFunction >>> {* >>> >>> *private ValueState isExist;* >>> >>> *public void open(Configuration parameters) throws Exception {* >>> *ValueStateDescriptor desc = new * >>> *StateTtlConfig ttlConfig = >>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..* >>> *desc.enableTimeToLive(ttlConfig);* >>> *isExist = getRuntimeContext().getState(desc);* >>> *}* >>> >>> *public void processElement(IN in, ) {* >>> *if(null == isExist.value()) {* >>> *out.collect(in)* >>> *isExist.update(true)* >>> *} * >>> *}* >>> *}* >>> >>> Because the number of distinct key is too large(about 10 billion one day >>> ), there's performance bottleneck for this operator. >>> How can I optimize the performance? >>> >>> Thanks, >>> Lei >>> >>> >>
Re: How to enable RocksDB native metrics?
Hi Biao, I tried, it doesn't work. The cmd is: flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu default -p 8 -Dstate.backend.latency-track.keyed-state-enabled=true -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic dwd_audio_record --groupId clean_wl_ --sourceServers x.x.x.x:9092 Seems the -D param is ignored. Even i change the param to a wrong spelling, the job is submitted successfully. Any suggestions on this? Thanks, Lei On Mon, Apr 8, 2024 at 9:48 AM Biao Geng wrote: > Hi Lei, > You can use the "-D" option in the command line to set configs for a > specific job. E.g, `flink run-application -t > yarn-application -Djobmanager.memory.process.size=1024m `. > See > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ > for more details. > > Best, > Biao Geng > > Marco Villalobos 于2024年4月8日周一 09:22写道: > >> Hi Lei, >> >> Have you tried enabling these Flink configuration properties? >> >> Configuration >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> nightlies.apache.org >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> [image: favicon.png] >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> >> Sent from my iPhone >> >> On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: >> >> >> I want to enable it only for specified jobs, how can I specify the >> configurations on cmd line when submitting a job? >> >> Thanks, >> Lei >> >> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: >> >>> Hi Lei, >>> >>> You can enable it by some configurations listed in: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >>> (RocksDB Native Metrics) >>> >>> >>> Best, >>> Zakelly >>> >>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan >>> wrote: >>> >>>> Hi Lei, >>>> >>>> You can enable it by some configurations listed in: >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >>>> (RocksDB Native Metrics) >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: >>>> >>>>> >>>>> Using big state and want to do some performance tuning, how can i >>>>> enable RocksDB native metrics? >>>>> >>>>> I am using Flink 1.14.4 >>>>> >>>>> Thanks, >>>>> Lei >>>>> >>>>
Re: How to enable RocksDB native metrics?
Hi Biao, I tried, it does On Mon, Apr 8, 2024 at 9:48 AM Biao Geng wrote: > Hi Lei, > You can use the "-D" option in the command line to set configs for a > specific job. E.g, `flink run-application -t > yarn-application -Djobmanager.memory.process.size=1024m `. > See > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ > for more details. > > Best, > Biao Geng > > Marco Villalobos 于2024年4月8日周一 09:22写道: > >> Hi Lei, >> >> Have you tried enabling these Flink configuration properties? >> >> Configuration >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> nightlies.apache.org >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> [image: favicon.png] >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> >> Sent from my iPhone >> >> On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: >> >> >> I want to enable it only for specified jobs, how can I specify the >> configurations on cmd line when submitting a job? >> >> Thanks, >> Lei >> >> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: >> >>> Hi Lei, >>> >>> You can enable it by some configurations listed in: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >>> (RocksDB Native Metrics) >>> >>> >>> Best, >>> Zakelly >>> >>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan >>> wrote: >>> >>>> Hi Lei, >>>> >>>> You can enable it by some configurations listed in: >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >>>> (RocksDB Native Metrics) >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: >>>> >>>>> >>>>> Using big state and want to do some performance tuning, how can i >>>>> enable RocksDB native metrics? >>>>> >>>>> I am using Flink 1.14.4 >>>>> >>>>> Thanks, >>>>> Lei >>>>> >>>>
Re: How to enable RocksDB native metrics?
I can enable them by adding to flink-conf.yaml, it will work. However, I don't want to edit the flink-conf.yaml file, I want to enable the configurations when submitting a job on cmd line, then it only works for the job I submitted, I have no idea how to do this? Thanks, Lei On Mon, Apr 8, 2024 at 9:22 AM Marco Villalobos wrote: > Hi Lei, > > Have you tried enabling these Flink configuration properties? > > Configuration > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > nightlies.apache.org > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > [image: favicon.png] > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > > Sent from my iPhone > > On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: > > > I want to enable it only for specified jobs, how can I specify the > configurations on cmd line when submitting a job? > > Thanks, > Lei > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > >> Hi Lei, >> >> You can enable it by some configurations listed in: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >> (RocksDB Native Metrics) >> >> >> Best, >> Zakelly >> >> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: >> >>> Hi Lei, >>> >>> You can enable it by some configurations listed in: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >>> (RocksDB Native Metrics) >>> >>> >>> Best, >>> Zakelly >>> >>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: >>> >>>> >>>> Using big state and want to do some performance tuning, how can i >>>> enable RocksDB native metrics? >>>> >>>> I am using Flink 1.14.4 >>>> >>>> Thanks, >>>> Lei >>>> >>>
Re: How to enable RocksDB native metrics?
I want to enable it only for specified jobs, how can I specify the configurations on cmd line when submitting a job? Thanks, Lei On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > Hi Lei, > > You can enable it by some configurations listed in: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics > (RocksDB Native Metrics) > > > Best, > Zakelly > > On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan wrote: > >> Hi Lei, >> >> You can enable it by some configurations listed in: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >> (RocksDB Native Metrics) >> >> >> Best, >> Zakelly >> >> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: >> >>> >>> Using big state and want to do some performance tuning, how can i enable >>> RocksDB native metrics? >>> >>> I am using Flink 1.14.4 >>> >>> Thanks, >>> Lei >>> >>
How to enable RocksDB native metrics?
Using big state and want to do some performance tuning, how can i enable RocksDB native metrics? I am using Flink 1.14.4 Thanks, Lei
Re: Optimize exact deduplication for tens of billions data per day
Perhaps I can keyBy(Hash(originalKey) % 10) Then in the KeyProcessOperator using MapState instead of ValueState MapState mapState There's about 10 OriginalKey for each mapState Hope this will help On Fri, Mar 29, 2024 at 9:24 PM Péter Váry wrote: > Hi Lei, > > Have you tried to make the key smaller, and store a list of found keys as > a value? > > Let's make the operator key a hash of your original key, and store a list > of the full keys in the state. You can play with your hash length to > achieve the optimal number of keys. > > I hope this helps, > Peter > > On Fri, Mar 29, 2024, 09:08 Lei Wang wrote: > >> >> Use RocksDBBackend to store whether the element appeared within the last >> one day, here is the code: >> >> *public class DedupFunction extends KeyedProcessFunction {* >> >> *private ValueState isExist;* >> >> *public void open(Configuration parameters) throws Exception {* >> *ValueStateDescriptor desc = new * >> *StateTtlConfig ttlConfig = >> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..* >> *desc.enableTimeToLive(ttlConfig);* >> *isExist = getRuntimeContext().getState(desc);* >> *}* >> >> *public void processElement(IN in, ) {* >> *if(null == isExist.value()) {* >> *out.collect(in)* >> *isExist.update(true)* >> *} * >> *}* >> *}* >> >> Because the number of distinct key is too large(about 10 billion one day >> ), there's performance bottleneck for this operator. >> How can I optimize the performance? >> >> Thanks, >> Lei >> >> >
Optimize exact deduplication for tens of billions data per day
Use RocksDBBackend to store whether the element appeared within the last one day, here is the code: *public class DedupFunction extends KeyedProcessFunction {* *private ValueState isExist;* *public void open(Configuration parameters) throws Exception {* *ValueStateDescriptor desc = new * *StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..* *desc.enableTimeToLive(ttlConfig);* *isExist = getRuntimeContext().getState(desc);* *}* *public void processElement(IN in, ) {* *if(null == isExist.value()) {* *out.collect(in)* *isExist.update(true)* *} * *}* *}* Because the number of distinct key is too large(about 10 billion one day ), there's performance bottleneck for this operator. How can I optimize the performance? Thanks, Lei
TaskMgr Metaspace become bigger and bigger after submitting new jobs
I start a standalone session on a single server with only one taskMgr. The JVM metaspace will become bigger after submitting a new job. Even if I cancel the submitted job, the JVM metaspace will not decrease. After submitting about 15 times, the task manager was shut down because of OOM 2022-04-24 21:39:29,856 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error oc curred while executing the TaskManager. Shutting it down... java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two thing s: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be s hutdown... at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_322] at java.lang.ClassLoader.defineClass(ClassLoader.java:756) ~[?:1.8.0_322] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_322] at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) ~[?:1.8.0_322] at java.net.URLClassLoader.access$100(URLClassLoader.java:74) ~[?:1.8.0_322] at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_322] at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_322] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_322] at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_322] Seems the JVM will not unload the class info after cancelling a job. How can i solve this? Thanks, Lei
Re: 实时数据入库怎样过滤中间状态,保证最终一致
谢谢,这种是可以。 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink() 实际上逆序输出了窗口内的所有记录。 谢谢, 王磊 On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com> wrote: > keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小 > > > 2022年2月25日 下午6:45,Lei Wang 写道: > > > > 场景描述: > > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下: > > order_id status > > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。 > > > > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id > > 最终的状态不丢,但这个最终的状态也不确定是多少。 > > > > 我的做法是 KeyBy orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id > > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。 > > > > 请问有什么其他的解决方法吗? > > > > 谢谢, > > 王磊 > >
Re: keyBy 后的 getKey 函数调用了两次
谢谢,了解了。 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1)); 上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。 需要用什么方式实现这个功能比较合适呢? On Tue, Mar 1, 2022 at 10:52 AM yidan zhao wrote: > > keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。 > > Lei Wang 于2022年3月1日周二 10:49写道: > > > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 > > > > env.addSource(consumer).keyBy(new KeySelector() { > > @Override > > public String getKey(String value) throws Exception { > > System.out.println(value); > > return value; > > } > > }).addSink(new SinkTest(1)); > > > > > > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。 > > > > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢? > > > > > > 谢谢, > > > > 王磊 > > >
keyBy 后的 getKey 函数调用了两次
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { System.out.println(value); return value; } }).addSink(new SinkTest(1)); 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢? 谢谢, 王磊
Kafka order info to MySQL discard middle status and guarantee final correctness
Receive order message from kafka, every message has a status field, the schema is just like this: orderId,status The message will be inserted to MySQL. For each order there's too much status and changes very frequently. In order to reduce stress to the database, we can discard some middle status, but must guarantee the final status is correct. In addition, the final status is not a definite value. How can I implement this? Thanks, Lei
实时数据入库怎样过滤中间状态,保证最终一致
场景描述: Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下: order_id status 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id 最终的状态不丢,但这个最终的状态也不确定是多少。 我的做法是 KeyBy orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id 最后来的两条记录时间间隔太小,会导致最终的状态丢失。 请问有什么其他的解决方法吗? 谢谢, 王磊
Re: flink 以阿里云 oss 作为 checkpoint cpu 过高
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。 程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G On Mon, Oct 18, 2021 at 10:44 AM Michael Ran wrote: > 应该和OSS没关系吧,毕竟只是个存储。 > 我们CPU 你先看看消耗在哪个线程或者方法类呗 > > > > 在 2021-10-08 16:34:47,"Lei Wang" 写道: > > > > flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 > 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 > > > > > > > 这个可能的原因是什么?会跟 OSS 有关吗? > > > 谢谢, > 王磊
Checkpoint size increasing even i enable increasemental checkpoint
[image: image.png] The checkpointed data size became bigger and bigger and the node cpu is very high when the job is doing checkpointing. But I have enabled incremental checkpointing: env.setStateBackend(new RocksDBStateBackend(checkpointDir, true)); I am using flink-1.11.2 and aliyun oss as checkpoint storage. Any insight on this? Thanks, Lei
flink 以阿里云 oss 作为 checkpoint cpu 过高
flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 [image: image.png] 这个可能的原因是什么?会跟 OSS 有关吗? 谢谢, 王磊
Re: Task is always created state after submit a example job
There's enough slots on the jobmanger UI, but the slots are not available. After i add taskmanager.host: localhost to flink-conf.yaml, it works. But i don't know why. Thanks, Lei On Fri, Jun 18, 2021 at 6:07 PM Piotr Nowojski wrote: > Hi, > > I would start by looking at the Job Manager and Task Manager logs. Take a > look if Task Managers connected/registered in the Job Manager and if so, if > there were no problems when submitting the job. It seems like either there > are not enough slots, or slots are actually not available. > > Best, > Piotrek > > pt., 18 cze 2021 o 05:53 Lei Wang napisał(a): > >> flink 1.11.2 on a single host. >> >> ./bin/start-cluster.sh and then >> >> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname >> localhost --port >> >> But on the jobmanager UI, the task is always in created state. There's >> available slots. >> >> Any insights on this? >> >> Thanks, >> Lei >> >
Re: flink 提交job后 task 一直是 schedule 状态
flink-conf.yaml 中加入下面的配置就可以了,但我不知道为什么。 taskmanager.host: localhost On Fri, Jun 18, 2021 at 1:43 PM Lei Wang wrote: > flink-1.11.2 > ./bin/start-cluster.sh 启动然后 > ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname > localhost --port > > 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误 > > 2021-06-18 13:34:26,683 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2 > 28577f2) switched from SCHEDULED to FAILED on not deployed. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate the required slot within slot request timeout. Please > make sure tha > t the cluster has enough resources. > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) > ~[flink-dist_2.11-1.11.2.jar:1 > .11.2] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422) > ~[flink-dist_2.11-1.11.2.jar:1.11.2 > > 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。 > > 有大神帮助解释一下吗? > > 谢谢, > 王磊 >
flink 提交job后 task 一直是 schedule 状态
flink-1.11.2 ./bin/start-cluster.sh 启动然后 ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误 2021-06-18 13:34:26,683 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2 28577f2) switched from SCHEDULED to FAILED on not deployed. org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure tha t the cluster has enough resources. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) ~[flink-dist_2.11-1.11.2.jar:1 .11.2] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422) ~[flink-dist_2.11-1.11.2.jar:1.11.2 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。 有大神帮助解释一下吗? 谢谢, 王磊
Task is always created state after submit a example job
flink 1.11.2 on a single host. ./bin/start-cluster.sh and then ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port But on the jobmanager UI, the task is always in created state. There's available slots. Any insights on this? Thanks, Lei
Flink 提交 job 后 task 始终是schedule 状态
用 standone 方式在一台机器上启动,提交job 后 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. slots 是充足的。 我用的是 flink-1.11.2 ,我看了下跟 https://issues.apache.org/jira/browse/FLINK-19237 类似,但是我看不懂是什么意思。 但奇怪的是,我在其他服务器上做相同的操作就没有这个问题。有大神给解释下吗? 谢谢, 王磊
Re: How to do code isolation if muiltple jobs run on the same taskmanager process?
Thanks Arvid, If too many jobs run in the same task manager JVM, will it cause too much metaspace memory occupation? Thanks, Lei On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise wrote: > Hi Lei, > > each application has its own classloader as such each static constant > exists multiple times (1 per job). So there should be no interference. You > could verify it by logging the value of the constant and see it yourself. > > Best, > > Arvid > > On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote: > >> Consider the following situation. >> >> Two jobs builed in the same jar, they will share some common code, for >> example, some static constants variables. >> Currently they are running on the same task manager process. >> >> I killed job1, changed the static variable to another and then submit it >> again. >> Does another job will get the new value of the static variable or still >> use the old value? >> >> Thanks, >> Lei >> >> >>
flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator 上能保留子类的完整信息并强制转换吗? 比如: DataStream stream = source.from(SubClass); stream.keyBy( ) { 这里的代码能判断并强制转换吗。 SubClass subObj = (SubClass) baseObj; } 谢谢, 王磊
How to do code isolation if muiltple jobs run on the same taskmanager process?
Consider the following situation. Two jobs builed in the same jar, they will share some common code, for example, some static constants variables. Currently they are running on the same task manager process. I killed job1, changed the static variable to another and then submit it again. Does another job will get the new value of the static variable or still use the old value? Thanks, Lei
flink standalone 模式运行任务的问题
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' 按照日志中的建议,我把 metaspace size 从 256M 调整到了 512M,但还是出现了这个错误。 经过我的观察,在新提交任务的时候比较容易出现这个问题,任务一直正常跑很少出现这个错误。 我 flink 是 standAlone 模式部署的。就两个机器, 每台机器 一个 taskManager,总共运行了 9 个 job,所有 job 都打在了一个 jar 中,jar 大小为 42M. 我自己的猜想是程序正常运行时 metaspace 基本已经满了,再新提交一个任务导致又重新初始化 jar 中所有类的 符号 引用空间不够导致了这个错误。不知道这个想法对不对。 但我还有一个疑问,standalone 模式不同 job 实际上跑在了相同的 TaskMgr 进程上,只有一个 JVM,怎么实现代码隔离呢? 比如下面的例子: job1 和 job2 打在了同一个 jar 中,都用到了代码中的一个 static 变量。 kill 掉 job1, 更改了 这个 static 变种的值,再提交 job1,那更改后的static 变量值 对 job2 会生效吗? 谢谢, 王磊
Re: Jobmanager stopped because uncaught exception
I see there's a related issue https://issues.apache.org/jira/browse/FLINK-21053 which is still open. Does it mean the similar issue will still exist even if i upgrade to 1.12.2 ? Thanks, Lei On Mon, Feb 8, 2021 at 3:54 PM Yang Wang wrote: > Maybe it is a known issue[1] and has already been resolved in 1.12.2(will > release soon). > BTW, I think it is unrelated with the aliyun oss info logs. > > [1]. https://issues.apache.org/jira/browse/FLINK-20992 > > > Best, > Yang > > Lei Wang 于2021年2月8日周一 下午2:22写道: > >> Flink standalone HA. Flink version 1.12.1 >> >> 2021-02-08 13:57:50,550 ERROR >> org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: >> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the >> process... >> java.util.concurrent.RejectedExecutionException: Task >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb >> rejected from >> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated, >> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455] >> at >> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) >> ~[?:1.8.0_275] >> at >> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64) >> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290) >> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >> at >> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66) >> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> ~[?:1.8.0_275] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ~[?:1.8.0_275] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] >> >> Using aliyun oss as statebackend storage. >> Before the ERROR, there's a lot of info message like this: >> >> 2021-02-08 13:57:50,452 INFO >> org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - >> [Server]Unable to execute HT >> TP request: Not Found >> [ErrorCode]: NoSuchKey >> [RequestId]: 6020D2DEA1E11430349E8323 >> >> >> Any insight on this? >> >> Thanks, >> Lei >> >
Jobmanager stopped because uncaught exception
Flink standalone HA. Flink version 1.12.1 2021-02-08 13:57:50,550 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the process... java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[?:1.8.0_275] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_275] at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) ~[?:1.8.0_275] at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[?:1.8.0_275] at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) ~[?:1.8.0_275] at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_275] at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_275] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] Using aliyun oss as statebackend storage. Before the ERROR, there's a lot of info message like this: 2021-02-08 13:57:50,452 INFO org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - [Server]Unable to execute HT TP request: Not Found [ErrorCode]: NoSuchKey [RequestId]: 6020D2DEA1E11430349E8323 Any insight on this? Thanks, Lei
Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?
如题, 可以直接这样写吗? env.setStateBackend(new RocksDBStateBackend(“oss://”, true)); 谢谢, 王磊
怎样定时更新广播变量的值
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊
Flink Sink function 的 close() 在程序停止时一定会被调用到吗?
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100) 才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。 public class SinkToJDBCWithJDBCStatementBatch extends RichSinkFunction { private List statementList = new ArrayList(); @Override public void close() throws Exception { writeToDatabase(); this.statementList.clear(); super.close(); if (dataSource != null) { dataSource.close(); } } @Override public void invoke(JDBCStatement statement, Context context) throws Exception { if (statementList.size() < 100) { statementList.add(statement); return; } writeToDatabase(); this.statementList.clear(); } public void writeToDatabase(){ . } } 我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢? 谢谢, 王磊
flink keyedState 能不能存储泛型或抽象类型
下面的业务逻辑 robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下: private transient MapState state; for (Entry entry : RulesFactory.getChargerTwoRecordRules().entrySet()) { String faultName = entry.getKey(); IChargerTwoRecordRule rule = entry.getValue(); RobotData old = state.get(faultName); rule.handleLogMsg(old, current); } 现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢? 比如 MapState state; 之后根据不同的规则 把 Object 转换成具体的类 谢谢, 王磊
Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?
用 session windown 确实能满足功能: robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, y) -> y); 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" 写道: > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > >Lei Wang 于2020年11月11日周三 下午2:03写道: > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >> > >> 比如 > >> robot1 2020-11-11 12:00:00 msginfo > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > 就发出报警呢? > >> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > >> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > >> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > >> 我们必须 按 robotId 做 keyBy > >> > >> 求大神指教。 > >> > >> 谢谢, > >> 王磊 > >> >
怎样实现超过一定时间没有收到消息就发出报警的功能?
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 比如 robot1 2020-11-11 12:00:00 msginfo 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 我们必须 按 robotId 做 keyBy 求大神指教。 谢谢, 王磊