Re: flink cdc metrics 问题

2024-04-07 Thread Shawn Huang
你好,目前flink cdc没有提供未消费binlog数据条数这样的指标,你可以通过 currentFetchEventTimeLag
这个指标(表示消费到的binlog数据中时间与当前时间延迟)来判断当前消费情况。

[1]
https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java

Best,
Shawn Huang


casel.chen  于2024年4月8日周一 12:01写道:

> 请问flink cdc对外有暴露一些监控metrics么?
> 我希望能够监控到使用flink cdc的实时作业当前未消费的binlog数据条数,类似于kafka topic消费积压监控。
> 想通过这个监控防止flink cdc实时作业消费慢而被套圈(最大binlog条数如何获取?)


Re: Combining multiple stages into a multi-stage processing pipeline

2024-04-07 Thread Mark Petronic
Thank you Yunfeng. Your comments gave me some insights to explore how to
use consecutive windows. So, I coded up a version that looks like this and
works well for me:

KafkaSource => Keyby => TumblingWindows => ProcessWindowFn => WindowAll =>
ProcessWindowFn => (Here I will repeated keyed and windowall in addition
stages)

The missing connection for me was not understanding that I could connect
windows to windows in the same data stream. That understanding made all the
difference. So the now the keyed tumbling windows for the 21 keys each
process N records per key and create a score over that data and output a
POJO containing the score and a List. Then the WindowAll gets
those 21 POJOs of N records and iterates over all 21 * N records to
calculate the overall score. Now that it has in hand the overall score and
the 21 keyed scores from the prior windows, it can compare each of the 21
scores to the overall score and conditionally out.collect() only the
List for the record sets below threshold. Then, subsequent stages
can rinse and repeat this process in one clean job graph.

Thanks again for you thoughts. They really helped light the light bulb for
me :)
Mark


On Sat, Apr 6, 2024 at 11:24 PM Yunfeng Zhou 
wrote:

> Hi Mark,
>
> IMHO, your design of the Flink application is generally feasible. In
> Flink ML, I have once met a similar design in ChiSqTest operator,
> where the input data is first aggregated to generate some results and
> then broadcast and connected with other result streams from the same
> input afterwards. You may refer to this algorithm for more details
> when designing your applications.
>
> https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java
>
> Besides, side outputs are typically used when you want to split an
> output stream into different categories. Given that the
> ProcessWindowFn before each SideOutput-x only has one downstream, it
> would be enough to directly pass the resulting DataStream to session
> windows instead of introducing side outputs.
>
> Best,
> Yunfeng
>
> On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic 
> wrote:
> >
> > I am looking for some design advice for a new Flink application and I am
> relatively new to Flink - I have one, fairly straightforward Flink
> application in production so far.
> >
> > For this new application, I want to create a three-stage processing
> pipeline. Functionally, I am seeing this as ONE long datastream. But, I
> have to evaluate the STAGE-1 data in a special manner to then pass on that
> evaluation to STAGE-2 where it will do its own special evaluation using the
> STAGE-1 evaluation results to shape its evaluation. The same thing happens
> again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
> result is published to Kafka. The stages functionally look like this:
> >
> > STAGE-1
> > KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
> SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
> (SideOutput-2[WindowRecords], KafkaSink[EvalResult])
> > |=> WindowAll => ProcessWindowFn =>
> SideOutput-1 ^
> >
> > STAGE-2
> > SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
> SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
> (SideOutput-4[WindowRecords], KafkaSink[EvalResult])
> >
> > STAGE-3
> > SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
> SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink
> >
> > DESCRIPTION
> >
> > In STAGE-1, there are a fixed number of known keys so I will only see at
> most about 21 distinct keys and therefore up to 21 tumbling one-minute
> windows. I also need to aggregate all data in a global window to get an
> overall non-keyed result. I need to bring the 21 results from those 21
> tumbling windows AND the one global result into one place where I can
> compare each of the 21 windows results to the one global result. Based on
> this evaluation, only some of the 21 windows results will survive that
> test. I want to then take the data records from those, say 3 surviving
> windows, and make them the "source" for STAGE-2 processing as well as
> publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
> reprocess the same data records that the three STAGE-1 surviving windows
> processed, only keying them by different dimensions. I expect there to be
> around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
> this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
> to form the new "source" datastream for STAGE-2.
> >
> > Where I am struggling is:
> >
> > Trying to figure out how to best connect the output of the 21 STAGE-1
> windows and the one WIndowAll window records into a single point (I propose
> SessionWindow1) to be able to compare each of the 21 windows data results
> with the WindowAll non-keyed results.
> > The best way to connect together these multiple stages.
> >
> > Looking 

flink cdc metrics 问题

2024-04-07 Thread casel.chen
请问flink cdc对外有暴露一些监控metrics么?
我希望能够监控到使用flink cdc的实时作业当前未消费的binlog数据条数,类似于kafka topic消费积压监控。
想通过这个监控防止flink cdc实时作业消费慢而被套圈(最大binlog条数如何获取?)

Re: Impact on using clean code and serializing everything

2024-04-07 Thread Biao Geng
Hi Oscar,

I assume the "dependency" in your description refers to the custom fields
in the ProcessFunction's implementation. You are right that as the
ProcessFunction inherits `Serializable` interface so we should make all
fields either serializable or transient.
As for performance, I have no data but theoretically, there be no much
difference in most cases(in fact, maybe you are wondering the default
serialization performance of JDK). For a long running streaming job, the
constructor or open() method are usually not in the key path of performance.
For best practice or to clean codes, in flink's abstraction, open() method
is designed for one time setup work. So it is usually better to mark these
fields as transient and initialize these fields in open() methods
(especially when we need to do some extra work like creating db connection).

Hope it helps!
Best,
Biao Geng

Oscar Perez via user  于2024年4月4日周四 17:14写道:

> Hi,
>
> We would like to adhere to clean code and expose all dependencies in the
> constructor of the process functions
>
> In flink, however, all dependencies passed to process functions must be
> serializable. Another workaround is to instantiate these dependencies in
> the open method of the process function and declare this dependency
> transient
>
> I wonder how, performance wise, would impact the performance of the job if
> we declare all dependencies in the constructor and make them serializable.
> Is this a wrong pattern to do? Has anybody run any experiment on
> performance degradation of dependency exposed in the constructor vs
> declaring it in the open method?
>
> Thanks!
> Oscar
>


Re: 退订

2024-04-07 Thread Biao Geng
Hi,

If you want to unsubscribe to user-zh mailing list, please send an email
with any content to user-zh-unsubscr...@flink.apache.org
.
退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.


Best,
Biao Geng


995626544 <995626...@qq.com.invalid> 于2024年4月7日周日 16:06写道:

> 退订
>
>
>
>
> 995626544
> 995626...@qq.com
>
>
>
> 


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Biao Geng
Hi Lei,
You can use the "-D" option in the command line to set configs for a
specific job. E.g, `flink run-application -t
yarn-application  -Djobmanager.memory.process.size=1024m  `.
See
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/
for more details.

Best,
Biao Geng

Marco Villalobos  于2024年4月8日周一 09:22写道:

> Hi Lei,
>
> Have you tried enabling these Flink configuration properties?
>
> Configuration
> 
> nightlies.apache.org
> 
> [image: favicon.png]
> 
> 
>
> Sent from my iPhone
>
> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>
> 
> I  want to enable it only for specified jobs, how can I specify the
>  configurations on  cmd line when submitting a job?
>
> Thanks,
> Lei
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
>> Hi Lei,
>>
>> You can enable it by some configurations listed in:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>  (RocksDB Native Metrics)
>>
>>
>> Best,
>> Zakelly
>>
>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>>
>>> Hi Lei,
>>>
>>> You can enable it by some configurations listed in:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>  (RocksDB Native Metrics)
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>>

 Using big state and want to do some performance tuning, how can i
 enable RocksDB native metrics?

 I  am using  Flink 1.14.4

 Thanks,
 Lei

>>>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
I can enable them by adding to flink-conf.yaml, it will work.

However, I don't want to edit the flink-conf.yaml file, I want to enable
the configurations when submitting a job on cmd line, then it only
works for the job I submitted, I have no idea how to do this?

Thanks,
Lei

On Mon, Apr 8, 2024 at 9:22 AM Marco Villalobos 
wrote:

> Hi Lei,
>
> Have you tried enabling these Flink configuration properties?
>
> Configuration
> 
> nightlies.apache.org
> 
> [image: favicon.png]
> 
> 
>
> Sent from my iPhone
>
> On Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:
>
> 
> I  want to enable it only for specified jobs, how can I specify the
>  configurations on  cmd line when submitting a job?
>
> Thanks,
> Lei
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
>> Hi Lei,
>>
>> You can enable it by some configurations listed in:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>  (RocksDB Native Metrics)
>>
>>
>> Best,
>> Zakelly
>>
>> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>>
>>> Hi Lei,
>>>
>>> You can enable it by some configurations listed in:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>>  (RocksDB Native Metrics)
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>>

 Using big state and want to do some performance tuning, how can i
 enable RocksDB native metrics?

 I  am using  Flink 1.14.4

 Thanks,
 Lei

>>>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Marco Villalobos
Hi Lei, Have you tried enabling these Flink configuration properties?Configurationnightlies.apache.orgSent from my iPhoneOn Apr 7, 2024, at 6:03 PM, Lei Wang  wrote:I  want to enable it only for specified jobs, how can I specify the   configurations on  cmd line when submitting a job?Thanks,LeiOn Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:Hi Lei,You can enable it by some configurations listed in: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics   (RocksDB Native Metrics)Best,ZakellyOn Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:Hi Lei,You can enable it by some configurations listed in: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics   (RocksDB Native Metrics)Best,ZakellyOn Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:Using big state and want to do some performance tuning, how can i enable RocksDB native metrics?I  am using  Flink 1.14.4 Thanks,Lei 





Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
I  want to enable it only for specified jobs, how can I specify the
 configurations on  cmd line when submitting a job?

Thanks,
Lei

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:

> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:
>
>> Hi Lei,
>>
>> You can enable it by some configurations listed in:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>>  (RocksDB Native Metrics)
>>
>>
>> Best,
>> Zakelly
>>
>> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>>
>>>
>>> Using big state and want to do some performance tuning, how can i enable
>>> RocksDB native metrics?
>>>
>>> I  am using  Flink 1.14.4
>>>
>>> Thanks,
>>> Lei
>>>
>>


Re: Debugging Kryo Fallback

2024-04-07 Thread Salva Alcántara
Thanks Yunfeng! That is more or less what I do now when I run into the
problem. This approach reports problems one at a time (an exception is
raised on the first problem encountered).

Instead of that, I think accumulating all the issues and presenting them
all at once would be more user friendly.

Regards,

Salva

On Sun, Apr 7, 2024 at 5:43 AM Yunfeng Zhou 
wrote:

> Hi Salva,
>
> According to the description of the configuration
> `pipeline.generic-types`, after setting this to false you should be
> able to find UnsupportedOperationException in the Flink log describing
> the data types that have not been supported in Kryo. Then you should
> be able to look into your code finding out the usages of the certain
> data type and perform corresponding bug fixes. Other information
> provided in Flink's log, like those info-level logs in
> TypeExtractor@analyzePojo, might help reveal more details around the
> exception.
>
> Best,
> Yunfeng
>
> On Wed, Apr 3, 2024 at 4:19 PM Salva Alcántara 
> wrote:
> >
> > FYI Reposted in SO:
> > -
> https://stackoverflow.com/questions/78265380/how-to-debug-the-kryo-fallback-in-flink
> >
> > On Thu, Mar 28, 2024 at 7:24 AM Salva Alcántara 
> wrote:
> >>
> >> I wonder which is the simplest way of troubleshooting/debugging what
> causes the Kryo fallback.
> >>
> >> Detecting it is just a matter of adding this line to your job:
> >>
> >> ```
> >> env.getConfig().disableGenericTypes();
> >> ```
> >>
> >> or in more recent versions:
> >>
> >> ```
> >> pipeline.generic-types: false
> >>
> >> ```
> >>
> >> But once you detect the issue, what is the simplest way to debug it?
> You can of course add a breakpoint in:
> >> org.apache.flink.api.java.typeutils.TypeExtractor@analyzePojo
> >>
> >> but ideally there should be a simpler way to show all the problems
> encountered to the user without having to get that deep into the code.
> >>
> >> Thanks in advance,
> >>
> >> Salva
>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Zakelly Lan
Hi Lei,

You can enable it by some configurations listed in:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
 (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:

> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>
>>
>> Using big state and want to do some performance tuning, how can i enable
>> RocksDB native metrics?
>>
>> I  am using  Flink 1.14.4
>>
>> Thanks,
>> Lei
>>
>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread zbz-163
You can take a look at the document. [ 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
 ]

Thanks,
Zbz

> 2024年4月7日 13:41,Lei Wang  写道:
> 
> 
> Using big state and want to do some performance tuning, how can i enable 
> RocksDB native metrics?
> 
> I  am using  Flink 1.14.4 
> 
> Thanks,
> Lei