Re: Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-23 Thread Lei Wang
Sorry, it was probably  an observation mistake.

I export the metrics to Prometheus and query the result on grafana,
actually the usage will not exceed the capacity

Thanks,
Lei

On Fri, Apr 19, 2024 at 9:55 AM Hangxiang Yu  wrote:

> Hi, Lei.
> It's indeed a bit confusing. Could you share the related rocksdb log which
> may contain more detailed info ?
>
> On Fri, Apr 12, 2024 at 12:49 PM Lei Wang  wrote:
>
>>
>> I enable RocksDB native metrics and do some performance tuning.
>>
>> state.backend.rocksdb.block.cache-size is set to 128m,4 slots for each
>> TaskManager.
>>
>> The observed result for one specific parallel slot:
>> state.backend.rocksdb.metrics.block-cache-capacity  is about 14.5M
>>   state.backend.rocksdb.metrics.block-cache-usage  is about  24M
>>
>> I am a little confused, why usage is larger than capacity?
>>
>> Thanks,
>> Lei
>>
>
>
> --
> Best,
> Hangxiang.
>


Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-11 Thread Lei Wang
I enable RocksDB native metrics and do some performance tuning.

state.backend.rocksdb.block.cache-size is set to 128m,4 slots for each
TaskManager.

The observed result for one specific parallel slot:
state.backend.rocksdb.metrics.block-cache-capacity  is about 14.5M
  state.backend.rocksdb.metrics.block-cache-usage  is about  24M

I am a little confused, why usage is larger than capacity?

Thanks,
Lei


Re: How to enable RocksDB native metrics?

2024-04-11 Thread Lei Wang
Thanks very much, it finally works

On Thu, Apr 11, 2024 at 8:27 PM Zhanghao Chen 
wrote:

> Add a space between -yD and the param should do the trick.
>
> Best,
> Zhanghao Chen
> ------
> *From:* Lei Wang 
> *Sent:* Thursday, April 11, 2024 19:40
> *To:* Zhanghao Chen 
> *Cc:* Biao Geng ; user 
> *Subject:* Re: How to enable RocksDB native metrics?
>
> Hi Zhanghao,
>
> flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G
> -yqu default -p 8  -yDstate.backend.latency-track.keyed-state-enabled=true -c
> com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic
> dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092
>
> Tried, it doesn't work, the error is:
> Could not get job jar and dependencies from JAR file: JAR file does not
> exist: -yDstate.backend.latency-track.keyed-state-enabled=true
>
> Thanks,
> Lei
>
> On Thu, Apr 11, 2024 at 5:19 PM Zhanghao Chen 
> wrote:
>
> Hi Lei,
>
> You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D"
> should be used.
> --
> *From:* Lei Wang 
> *Sent:* Thursday, April 11, 2024 12:39
> *To:* Biao Geng 
> *Cc:* user 
> *Subject:* Re: How to enable RocksDB native metrics?
>
> Hi Biao,
>
> I  tried, it  doesn't work. The cmd is:
>
> flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G
> -yqu default -p 8  -Dstate.backend.latency-track.keyed-state-enabled=true
> -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar
> --sourceTopic dwd_audio_record  --groupId clean_wl_ --sourceServers
>  x.x.x.x:9092
>
> Seems the -D param is ignored. Even i change the param to a wrong
> spelling, the job is submitted successfully.
> Any suggestions on this?
>
> Thanks,
> Lei
>
> On Mon, Apr 8, 2024 at 9:48 AM Biao Geng  wrote:
>
> Hi Lei,
> You can use the "-D" option in the command line to set configs for a
> specific job. E.g, `flink run-application -t
> yarn-application  -Djobmanager.memory.process.size=1024m  `.
> See
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/
> for more details.
>
> Best,
> Biao Geng
>
> Marco Villalobos  于2024年4月8日周一 09:22写道:
>
> Hi Lei,
>
> Have you tried enabling these Flink configuration properties?
>
> Configuration
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> nightlies.apache.org
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> [image: favicon.png]
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>
> Sent from my iPhone
>
> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>
> 
> I  want to enable it only for specified jobs, how can I specify the
>  configurations on  cmd line when submitting a job?
>
> Thanks,
> Lei
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>
>
> Using big state and want to do some performance tuning, how can i enable
> RocksDB native metrics?
>
> I  am using  Flink 1.14.4
>
> Thanks,
> Lei
>
>


Re: How to enable RocksDB native metrics?

2024-04-11 Thread Lei Wang
Hi Zhanghao,

flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu
default -p 8  -yDstate.backend.latency-track.keyed-state-enabled=true -c
com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic
dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092

Tried, it doesn't work, the error is:
Could not get job jar and dependencies from JAR file: JAR file does not
exist: -yDstate.backend.latency-track.keyed-state-enabled=true

Thanks,
Lei

On Thu, Apr 11, 2024 at 5:19 PM Zhanghao Chen 
wrote:

> Hi Lei,
>
> You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D"
> should be used.
> ------
> *From:* Lei Wang 
> *Sent:* Thursday, April 11, 2024 12:39
> *To:* Biao Geng 
> *Cc:* user 
> *Subject:* Re: How to enable RocksDB native metrics?
>
> Hi Biao,
>
> I  tried, it  doesn't work. The cmd is:
>
> flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G
> -yqu default -p 8  -Dstate.backend.latency-track.keyed-state-enabled=true
> -c com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar
> --sourceTopic dwd_audio_record  --groupId clean_wl_ --sourceServers
>  x.x.x.x:9092
>
> Seems the -D param is ignored. Even i change the param to a wrong
> spelling, the job is submitted successfully.
> Any suggestions on this?
>
> Thanks,
> Lei
>
> On Mon, Apr 8, 2024 at 9:48 AM Biao Geng  wrote:
>
> Hi Lei,
> You can use the "-D" option in the command line to set configs for a
> specific job. E.g, `flink run-application -t
> yarn-application  -Djobmanager.memory.process.size=1024m  `.
> See
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/
> for more details.
>
> Best,
> Biao Geng
>
> Marco Villalobos  于2024年4月8日周一 09:22写道:
>
> Hi Lei,
>
> Have you tried enabling these Flink configuration properties?
>
> Configuration
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> nightlies.apache.org
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> [image: favicon.png]
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>
> Sent from my iPhone
>
> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>
> 
> I  want to enable it only for specified jobs, how can I specify the
>  configurations on  cmd line when submitting a job?
>
> Thanks,
> Lei
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>
>
> Using big state and want to do some performance tuning, how can i enable
> RocksDB native metrics?
>
> I  am using  Flink 1.14.4
>
> Thanks,
> Lei
>
>


Re: Optimize exact deduplication for tens of billions data per day

2024-04-11 Thread Lei Wang
Hi Peter,

I tried,this improved performance significantly,but i don't know exactly
why.
According to what i know, the number of keys in RocksDB doesn't decrease.

Any specific technical material about this?

Thanks,
Lei


On Fri, Mar 29, 2024 at 9:49 PM Lei Wang  wrote:

> Perhaps I can  keyBy(Hash(originalKey) % 10)
> Then in the KeyProcessOperator using MapState instead of ValueState
>   MapState  mapState
>
> There's about  10 OriginalKey for each mapState
>
> Hope this will help
>
> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry 
> wrote:
>
>> Hi Lei,
>>
>> Have you tried to make the key smaller, and store a list of found keys as
>> a value?
>>
>> Let's make the operator key a hash of your original key, and store a list
>> of the full keys in the state. You can play with your hash length to
>> achieve the optimal number of keys.
>>
>> I hope this helps,
>> Peter
>>
>> On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:
>>
>>>
>>> Use RocksDBBackend to store whether the element appeared within the last
>>> one day,  here is the code:
>>>
>>> *public class DedupFunction extends KeyedProcessFunction
>>> {*
>>>
>>> *private ValueState isExist;*
>>>
>>> *public void open(Configuration parameters) throws Exception {*
>>> *ValueStateDescriptor desc = new *
>>> *StateTtlConfig ttlConfig =
>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
>>> *desc.enableTimeToLive(ttlConfig);*
>>> *isExist = getRuntimeContext().getState(desc);*
>>> *}*
>>>
>>> *public void processElement(IN in,  ) {*
>>> *if(null == isExist.value()) {*
>>> *out.collect(in)*
>>> *isExist.update(true)*
>>> *} *
>>> *}*
>>> *}*
>>>
>>> Because the number of distinct key is too large(about 10 billion one day
>>> ), there's performance bottleneck for this operator.
>>> How can I optimize the performance?
>>>
>>> Thanks,
>>> Lei
>>>
>>>
>>


Re: How to enable RocksDB native metrics?

2024-04-10 Thread Lei Wang
Hi Biao,

I  tried, it  doesn't work. The cmd is:

flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu
default -p 8  -Dstate.backend.latency-track.keyed-state-enabled=true -c
com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic
dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092

Seems the -D param is ignored. Even i change the param to a wrong spelling,
the job is submitted successfully.
Any suggestions on this?

Thanks,
Lei

On Mon, Apr 8, 2024 at 9:48 AM Biao Geng  wrote:

> Hi Lei,
> You can use the "-D" option in the command line to set configs for a
> specific job. E.g, `flink run-application -t
> yarn-application  -Djobmanager.memory.process.size=1024m  `.
> See
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/
> for more details.
>
> Best,
> Biao Geng
>
> Marco Villalobos  于2024年4月8日周一 09:22写道:
>
>> Hi Lei,
>>
>> Have you tried enabling these Flink configuration properties?
>>
>> Configuration
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> nightlies.apache.org
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> [image: favicon.png]
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>>
>> Sent from my iPhone
>>
>> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>>
>> 
>> I  want to enable it only for specified jobs, how can I specify the
>>  configurations on  cmd line when submitting a job?
>>
>> Thanks,
>> Lei
>>
>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>>
>>> Hi Lei,
>>>
>>> You can enable it by some configurations listed in:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>  (RocksDB Native Metrics)
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
>>> wrote:
>>>
>>>> Hi Lei,
>>>>
>>>> You can enable it by some configurations listed in:
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>>  (RocksDB Native Metrics)
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>>>
>>>>>
>>>>> Using big state and want to do some performance tuning, how can i
>>>>> enable RocksDB native metrics?
>>>>>
>>>>> I  am using  Flink 1.14.4
>>>>>
>>>>> Thanks,
>>>>> Lei
>>>>>
>>>>


Re: How to enable RocksDB native metrics?

2024-04-10 Thread Lei Wang
Hi Biao,

I  tried, it does

On Mon, Apr 8, 2024 at 9:48 AM Biao Geng  wrote:

> Hi Lei,
> You can use the "-D" option in the command line to set configs for a
> specific job. E.g, `flink run-application -t
> yarn-application  -Djobmanager.memory.process.size=1024m  `.
> See
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/
> for more details.
>
> Best,
> Biao Geng
>
> Marco Villalobos  于2024年4月8日周一 09:22写道:
>
>> Hi Lei,
>>
>> Have you tried enabling these Flink configuration properties?
>>
>> Configuration
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> nightlies.apache.org
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> [image: favicon.png]
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>>
>> Sent from my iPhone
>>
>> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>>
>> 
>> I  want to enable it only for specified jobs, how can I specify the
>>  configurations on  cmd line when submitting a job?
>>
>> Thanks,
>> Lei
>>
>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>>
>>> Hi Lei,
>>>
>>> You can enable it by some configurations listed in:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>  (RocksDB Native Metrics)
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
>>> wrote:
>>>
>>>> Hi Lei,
>>>>
>>>> You can enable it by some configurations listed in:
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>>  (RocksDB Native Metrics)
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>>>
>>>>>
>>>>> Using big state and want to do some performance tuning, how can i
>>>>> enable RocksDB native metrics?
>>>>>
>>>>> I  am using  Flink 1.14.4
>>>>>
>>>>> Thanks,
>>>>> Lei
>>>>>
>>>>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
I can enable them by adding to flink-conf.yaml, it will work.

However, I don't want to edit the flink-conf.yaml file, I want to enable
the configurations when submitting a job on cmd line, then it only
works for the job I submitted, I have no idea how to do this?

Thanks,
Lei

On Mon, Apr 8, 2024 at 9:22 AM Marco Villalobos 
wrote:

> Hi Lei,
>
> Have you tried enabling these Flink configuration properties?
>
> Configuration
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> nightlies.apache.org
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> [image: favicon.png]
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
>
> Sent from my iPhone
>
> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>
> 
> I  want to enable it only for specified jobs, how can I specify the
>  configurations on  cmd line when submitting a job?
>
> Thanks,
> Lei
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
>> Hi Lei,
>>
>> You can enable it by some configurations listed in:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>  (RocksDB Native Metrics)
>>
>>
>> Best,
>> Zakelly
>>
>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>>
>>> Hi Lei,
>>>
>>> You can enable it by some configurations listed in:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>  (RocksDB Native Metrics)
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>>
>>>>
>>>> Using big state and want to do some performance tuning, how can i
>>>> enable RocksDB native metrics?
>>>>
>>>> I  am using  Flink 1.14.4
>>>>
>>>> Thanks,
>>>> Lei
>>>>
>>>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
I  want to enable it only for specified jobs, how can I specify the
 configurations on  cmd line when submitting a job?

Thanks,
Lei

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:

> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
>> Hi Lei,
>>
>> You can enable it by some configurations listed in:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>  (RocksDB Native Metrics)
>>
>>
>> Best,
>> Zakelly
>>
>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>
>>>
>>> Using big state and want to do some performance tuning, how can i enable
>>> RocksDB native metrics?
>>>
>>> I  am using  Flink 1.14.4
>>>
>>> Thanks,
>>> Lei
>>>
>>


How to enable RocksDB native metrics?

2024-04-06 Thread Lei Wang
Using big state and want to do some performance tuning, how can i enable
RocksDB native metrics?

I  am using  Flink 1.14.4

Thanks,
Lei


Re: Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Lei Wang
Perhaps I can  keyBy(Hash(originalKey) % 10)
Then in the KeyProcessOperator using MapState instead of ValueState
  MapState  mapState

There's about  10 OriginalKey for each mapState

Hope this will help

On Fri, Mar 29, 2024 at 9:24 PM Péter Váry 
wrote:

> Hi Lei,
>
> Have you tried to make the key smaller, and store a list of found keys as
> a value?
>
> Let's make the operator key a hash of your original key, and store a list
> of the full keys in the state. You can play with your hash length to
> achieve the optimal number of keys.
>
> I hope this helps,
> Peter
>
> On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:
>
>>
>> Use RocksDBBackend to store whether the element appeared within the last
>> one day,  here is the code:
>>
>> *public class DedupFunction extends KeyedProcessFunction  {*
>>
>> *private ValueState isExist;*
>>
>> *public void open(Configuration parameters) throws Exception {*
>> *ValueStateDescriptor desc = new *
>> *StateTtlConfig ttlConfig =
>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
>> *desc.enableTimeToLive(ttlConfig);*
>> *isExist = getRuntimeContext().getState(desc);*
>> *}*
>>
>> *public void processElement(IN in,  ) {*
>> *if(null == isExist.value()) {*
>> *out.collect(in)*
>> *isExist.update(true)*
>> *} *
>> *}*
>> *}*
>>
>> Because the number of distinct key is too large(about 10 billion one day
>> ), there's performance bottleneck for this operator.
>> How can I optimize the performance?
>>
>> Thanks,
>> Lei
>>
>>
>


Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Lei Wang
Use RocksDBBackend to store whether the element appeared within the last
one day,  here is the code:

*public class DedupFunction extends KeyedProcessFunction  {*

*private ValueState isExist;*

*public void open(Configuration parameters) throws Exception {*
*ValueStateDescriptor desc = new *
*StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
*desc.enableTimeToLive(ttlConfig);*
*isExist = getRuntimeContext().getState(desc);*
*}*

*public void processElement(IN in,  ) {*
*if(null == isExist.value()) {*
*out.collect(in)*
*isExist.update(true)*
*} *
*}*
*}*

Because the number of distinct key is too large(about 10 billion one day ),
there's performance bottleneck for this operator.
How can I optimize the performance?

Thanks,
Lei


TaskMgr Metaspace become bigger and bigger after submitting new jobs

2022-04-24 Thread Lei Wang
I start a standalone session on a single server with only one taskMgr.
The JVM metaspace will become bigger after submitting a new job.
Even if I cancel the submitted job, the JVM metaspace will not decrease.

After submitting about 15 times, the task manager was shut down because of
OOM

2022-04-24 21:39:29,856 ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
error oc
curred while executing the TaskManager. Shutting it down...
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two thing
s: either the job requires a larger size of JVM metaspace to load classes
or there is a class loading leak. In
 the first case 'taskmanager.memory.jvm-metaspace.size' configuration
option should be increased. If the error
 persists (usually in cluster after several job (re-)submissions) then
there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be s
hutdown...
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_322]
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
~[?:1.8.0_322]
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
~[?:1.8.0_322]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
~[?:1.8.0_322]
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
~[?:1.8.0_322]
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
~[?:1.8.0_322]
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
~[?:1.8.0_322]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_322]
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
~[?:1.8.0_322]


Seems the JVM will not unload the class info after cancelling a job.
How can i solve this?

Thanks,
Lei


Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 Thread Lei Wang
谢谢,这种是可以。

取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()

实际上逆序输出了窗口内的所有记录。

谢谢,

王磊



On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com>
wrote:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang  写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>


Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 Thread Lei Wang
谢谢,了解了。

另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。

需要用什么方式实现这个功能比较合适呢?


On Tue, Mar 1, 2022 at 10:52 AM yidan zhao  wrote:

>
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
>
> Lei Wang  于2022年3月1日周二 10:49写道:
>
> > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> >
> > env.addSource(consumer).keyBy(new KeySelector() {
> > @Override
> > public String getKey(String value) throws Exception {
> > System.out.println(value);
> > return value;
> > }
> > }).addSink(new SinkTest(1));
> >
> >
> > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> >
> > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> >
> >
> > 谢谢,
> >
> > 王磊
> >
>


keyBy 后的 getKey 函数调用了两次

2022-02-28 Thread Lei Wang
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
System.out.println(value);
return value;
}
}).addSink(new SinkTest(1));


我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。

为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?


谢谢,

王磊


Kafka order info to MySQL discard middle status and guarantee final correctness

2022-02-27 Thread Lei Wang
Receive order message from kafka, every message has a status field, the
schema is just like this:  orderId,status

The message will be inserted to MySQL.

For each order  there's too much status and changes very frequently. In
order to reduce stress to the database, we can discard some middle status,
but must guarantee the final status is correct. In addition, the final
status is not a definite value.

How  can I implement this?

Thanks,
Lei


实时数据入库怎样过滤中间状态,保证最终一致

2022-02-25 Thread Lei Wang
场景描述:
Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
order_id   status
只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。

对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
最终的状态不丢,但这个最终的状态也不确定是多少。

我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
最后来的两条记录时间间隔太小,会导致最终的状态丢失。

请问有什么其他的解决方法吗?

谢谢,
王磊


Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-19 Thread Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Lei Wang
[image: image.png]

The  checkpointed data size became bigger and bigger and the node cpu is
very high when the job is doing checkpointing.
 But I have enabled incremental checkpointing:
env.setStateBackend(new RocksDBStateBackend(checkpointDir,
true));

I am using flink-1.11.2 and aliyun oss as checkpoint storage.


Any insight on this?

Thanks,

Lei


flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-08 Thread Lei Wang
flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。

[image: image.png]

这个可能的原因是什么?会跟 OSS 有关吗?

谢谢,
王磊


Re: Task is always created state after submit a example job

2021-06-20 Thread Lei Wang
There's enough slots on the jobmanger UI, but the slots are not available.

After i add  taskmanager.host: localhost to flink-conf.yaml, it works.

But i don't know why.

Thanks,
Lei


On Fri, Jun 18, 2021 at 6:07 PM Piotr Nowojski  wrote:

> Hi,
>
> I would start by looking at the Job Manager and Task Manager logs. Take a
> look if Task Managers connected/registered in the Job Manager and if so, if
> there were no problems when submitting the job. It seems like either there
> are not enough slots, or slots are actually not available.
>
> Best,
> Piotrek
>
> pt., 18 cze 2021 o 05:53 Lei Wang  napisał(a):
>
>> flink 1.11.2 on a single host.
>>
>> ./bin/start-cluster.sh and then
>>
>> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
>> localhost --port 
>>
>> But on the jobmanager UI, the task is always in created state.  There's
>> available  slots.
>>
>> Any insights on this?
>>
>> Thanks,
>> Lei
>>
>


Re: flink 提交job后 task 一直是 schedule 状态

2021-06-18 Thread Lei Wang
flink-conf.yaml 中加入下面的配置就可以了,但我不知道为什么。

taskmanager.host: localhost



On Fri, Jun 18, 2021 at 1:43 PM Lei Wang  wrote:

> flink-1.11.2
> ./bin/start-cluster.sh 启动然后
> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
> localhost --port 
>
> 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
>
> 2021-06-18 13:34:26,683 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
> 28577f2) switched from SCHEDULED to FAILED on not deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure tha
> t the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.11-1.11.2.jar:1
> .11.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2
>
> 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。
>
> 有大神帮助解释一下吗?
>
> 谢谢,
> 王磊
>


flink 提交job后 task 一直是 schedule 状态

2021-06-17 Thread Lei Wang
flink-1.11.2
./bin/start-cluster.sh 启动然后
./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
localhost --port 

但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误

2021-06-18 13:34:26,683 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
28577f2) switched from SCHEDULED to FAILED on not deployed.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure tha
t the cluster has enough resources.
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
~[flink-dist_2.11-1.11.2.jar:1
.11.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
~[flink-dist_2.11-1.11.2.jar:1.11.2

但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。

有大神帮助解释一下吗?

谢谢,
王磊


Task is always created state after submit a example job

2021-06-17 Thread Lei Wang
flink 1.11.2 on a single host.

./bin/start-cluster.sh and then

./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
localhost --port 

But on the jobmanager UI, the task is always in created state.  There's
available  slots.

Any insights on this?

Thanks,
Lei


Flink 提交 job 后 task 始终是schedule 状态

2021-06-17 Thread Lei Wang
用 standone 方式在一台机器上启动,提交job 后

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources.

slots 是充足的。
我用的是 flink-1.11.2 ,我看了下跟 https://issues.apache.org/jira/browse/FLINK-19237
类似,但是我看不懂是什么意思。

但奇怪的是,我在其他服务器上做相同的操作就没有这个问题。有大神给解释下吗?


谢谢,
王磊


Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Lei Wang
Thanks Arvid,

If too many jobs run in the same task manager JVM,  will it  cause too much
metaspace memory occupation?


Thanks,
Lei

On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise  wrote:

> Hi Lei,
>
> each application has its own classloader as such each static constant
> exists multiple times (1 per job). So there should be no interference. You
> could verify it by logging the value of the constant and see it yourself.
>
> Best,
>
> Arvid
>
> On Thu, Mar 11, 2021 at 7:11 AM Lei Wang  wrote:
>
>> Consider the following situation.
>>
>> Two jobs builed in the same jar, they will share some common code, for
>> example, some static constants variables.
>> Currently they are running on the same task manager process.
>>
>> I  killed job1, changed the static variable to another and then submit it
>> again.
>> Does another job will get the new value of the static variable or still
>> use the old value?
>>
>> Thanks,
>> Lei
>>
>>
>>


flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?

2021-03-11 Thread Lei Wang
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator
上能保留子类的完整信息并强制转换吗?


比如:

DataStream stream = source.from(SubClass);

stream.keyBy(   ) {

这里的代码能判断并强制转换吗。

SubClass subObj = (SubClass) baseObj;

}


谢谢,

王磊


How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-10 Thread Lei Wang
Consider the following situation.

Two jobs builed in the same jar, they will share some common code, for
example, some static constants variables.
Currently they are running on the same task manager process.

I  killed job1, changed the static variable to another and then submit it
again.
Does another job will get the new value of the static variable or still use
the old value?

Thanks,
Lei


flink standalone 模式运行任务的问题

2021-03-10 Thread Lei Wang
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size'

按照日志中的建议,我把 metaspace size 从 256M 调整到了 512M,但还是出现了这个错误。
经过我的观察,在新提交任务的时候比较容易出现这个问题,任务一直正常跑很少出现这个错误。
我 flink 是 standAlone 模式部署的。就两个机器, 每台机器 一个 taskManager,总共运行了 9 个 job,所有 job
都打在了一个 jar 中,jar 大小为 42M.

我自己的猜想是程序正常运行时 metaspace 基本已经满了,再新提交一个任务导致又重新初始化 jar 中所有类的 符号
引用空间不够导致了这个错误。不知道这个想法对不对。

但我还有一个疑问,standalone 模式不同  job 实际上跑在了相同的 TaskMgr 进程上,只有一个 JVM,怎么实现代码隔离呢?
比如下面的例子:

job1 和  job2 打在了同一个 jar 中,都用到了代码中的一个 static 变量。
kill 掉 job1, 更改了 这个 static 变种的值,再提交 job1,那更改后的static 变量值 对 job2 会生效吗?

谢谢,
王磊


Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Lei Wang
I see there's a related issue
https://issues.apache.org/jira/browse/FLINK-21053 which is still open.

Does it mean the similar issue will still exist  even if i upgrade to
1.12.2 ?

Thanks,
Lei

On Mon, Feb 8, 2021 at 3:54 PM Yang Wang  wrote:

> Maybe it is a known issue[1] and has already been resolved in 1.12.2(will
> release soon).
> BTW, I think it is unrelated with the aliyun oss info logs.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-20992
>
>
> Best,
> Yang
>
> Lei Wang  于2021年2月8日周一 下午2:22写道:
>
>> Flink standalone HA.   Flink version 1.12.1
>>
>> 2021-02-08 13:57:50,550 ERROR
>> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
>> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
>> process...
>> java.util.concurrent.RejectedExecutionException: Task
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
>> rejected from 
>> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
>> at
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>> ~[?:1.8.0_275]
>> at
>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_275]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>>
>> Using aliyun oss as statebackend storage.
>> Before the ERROR, there's a lot of  info message like this:
>>
>> 2021-02-08 13:57:50,452 INFO
>>  org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
>> [Server]Unable to execute HT
>> TP request: Not Found
>> [ErrorCode]: NoSuchKey
>> [RequestId]: 6020D2DEA1E11430349E8323
>>
>>
>> Any insight on this?
>>
>> Thanks,
>> Lei
>>
>


Jobmanager stopped because uncaught exception

2021-02-07 Thread Lei Wang
Flink standalone HA.   Flink version 1.12.1

2021-02-08 13:57:50,550 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
process...
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
~[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
~[?:1.8.0_275]
at
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
~[?:1.8.0_275]
at
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]

Using aliyun oss as statebackend storage.
Before the ERROR, there's a lot of  info message like this:

2021-02-08 13:57:50,452 INFO
 org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
[Server]Unable to execute HT
TP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 6020D2DEA1E11430349E8323


Any insight on this?

Thanks,
Lei


Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2020-12-28 Thread Lei Wang
如题, 可以直接这样写吗?

env.setStateBackend(new RocksDBStateBackend(“oss://”, true));

谢谢,
王磊


怎样定时更新广播变量的值

2020-12-08 Thread Lei Wang
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。

如果配置文件更新了,怎样能把广播变量的内容也更新呢?

谢谢,
王磊


Flink Sink function 的 close() 在程序停止时一定会被调用到吗?

2020-11-24 Thread Lei Wang
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100)
才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。


public class SinkToJDBCWithJDBCStatementBatch extends
RichSinkFunction {

private List statementList = new
ArrayList();


@Override
public void close() throws Exception {
writeToDatabase();
this.statementList.clear();
super.close();
if (dataSource != null) {
dataSource.close();
}
}

@Override
public void invoke(JDBCStatement statement, Context context) throws
Exception {

if (statementList.size() < 100) {
statementList.add(statement);
return;
}
writeToDatabase();
this.statementList.clear();
}

public void writeToDatabase(){
.
}
}

我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢?

谢谢,
王磊


flink keyedState 能不能存储泛型或抽象类型

2020-11-16 Thread Lei Wang
下面的业务逻辑

robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下:

private transient MapState state;

for (Entry entry :
RulesFactory.getChargerTwoRecordRules().entrySet()) {
String faultName = entry.getKey();
IChargerTwoRecordRule rule = entry.getValue();
RobotData old = state.get(faultName);
rule.handleLogMsg(old, current);
}

现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢?


比如

MapState state;

之后根据不同的规则 把 Object 转换成具体的类



谢谢,

王磊


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 Thread Lei Wang
用 session windown 确实能满足功能:

robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);

按照这种写法, 我理解 window state 中只保存了最近的一条记录。


正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。



On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang  于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00 msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>


怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-10 Thread Lei Wang
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。

比如
robot1   2020-11-11 12:00:00 msginfo
之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?

flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?

我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。

这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
我们必须 按 robotId 做 keyBy

求大神指教。

谢谢,
王磊