Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 文章 gongzhongqiang
Congratulations !
Thanks for all contributors.


Best,

Zhongqiang Gong

Qingsheng Ren  于 2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: ProcessWindowFunction中使用per-window state

2024-04-12 文章 gongzhongqiang
你好,

可以通过使用  globalState / windowState 获取之前的状态进行增量计算。

下面这个 demo 可以方便理解:

public class ProcessWindowFunctionDemo {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 使用处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 并行度为1
env.setParallelism(1);
// 设置数据源,一共三个元素
DataStream> dataStream = env.addSource(new
SourceFunction>() {
@Override
public void run(SourceContext> ctx)
throws Exception {
int xxxNum = 0;
int yyyNum = 0;
for (int i = 1; i < Integer.MAX_VALUE; i++) {
// 只有XXX和YYY两种name
String name = (0 == i % 2) ? "XXX" : "YYY";
//更新aaa和bbb元素的总数
if (0 == i % 2) {
xxxNum++;
} else {
yyyNum++;
}
// 使用当前时间作为时间戳
long timeStamp = System.currentTimeMillis();
// 将数据和时间戳打印出来,用来验证数据
System.out.println(String.format("source,%s, %s,XXX
total : %d,YYY total : %d\n",
name,
time(timeStamp),
xxxNum,
yyyNum));
// 发射一个元素,并且戴上了时间戳
ctx.collectWithTimestamp(new Tuple2(name, 1), timeStamp);
// 每发射一次就延时1秒
Thread.sleep(1000);
}
}

@Override
public void cancel() {
}
});

// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
SingleOutputStreamOperator mainDataStream = dataStream
// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
.keyBy(value -> value.f0)
// 5秒一次的滚动窗口
.timeWindow(Time.seconds(5))
// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
.process(new ProcessWindowFunction,
String, String, TimeWindow>() {
// 自定义状态
private ValueState state;
@Override
public void open(Configuration parameters) throws
Exception {
// 初始化状态,name是myState
state = getRuntimeContext().getState(new
ValueStateDescriptor<>("myState", KeyCount.class));
}

public void clear(Context context){
ValueState contextWindowValueState =
context.windowState().getState(new ValueStateDescriptor<>("myWindowState",
KeyCount.class));
contextWindowValueState.clear();
}

@Override
public void process(String s, Context context,
Iterable> iterable,
Collector collector) throws Exception {
// 从backend取得当前单词的myState状态
KeyCount current = state.value();
// 如果myState还从未没有赋值过,就在此初始化
if (current == null) {
current = new KeyCount();
current.key = s;
current.count = 0;
}
int count = 0;
// iterable可以访问该key当前窗口内的所有数据,
// 这里简单处理,只统计了元素数量
for (Tuple2 tuple2 : iterable) {
count++;
}
// 更新当前key的元素总数
current.count += count;
// 更新状态到backend
state.update(current);
System.out.println("getRuntimeContext() == context
:" + (getRuntimeContext() == context));
ValueState contextWindowValueState =
context.windowState().getState(new ValueStateDescriptor<>("myWindowState",
KeyCount.class));
ValueState contextGlobalValueState =
context.globalState().getState(new ValueStateDescriptor<>("myGlobalState",
KeyCount.class));
KeyCount windowValue =
contextWindowValueState.value();
if (windowValue == null) {
windowValue = new KeyCount();
windowValue.key = s;
windowValue.count = 0;
}
windowValue.count += count;
contextWindowValueState.update(windowValue);

KeyCount globalValue =
contextGlobalValueState.value();
if (globalValue == null) {
globalValue = new KeyCount();
globalValue.key = s;
globalValue.count = 0;
 

Re: flink 已完成job等一段时间会消失

2024-04-09 文章 gongzhongqiang
你好:

如果想长期保留已完成的任务,推荐使用  History Server :
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#history-server

Best,

Zhongqiang Gong

ha.fen...@aisino.com  于2024年4月9日周二 10:39写道:

> 在WEBUI里面,已完成的任务会在completed jobs里面能够看到,过了一会再进去看数据就没有了,是有什么配置自动删除吗?
>


Re: Re: 采集mysql全量的时候出现oom问题

2024-04-09 文章 gongzhongqiang
可以尝试的解决办法:

   - 调大 JM 内存 (如  Shawn Huang 所说)
   - 调整快照期间批读的大小,以降低 state 大小从而减轻 checkpiont 过程中 JM 内存压力


Best,
Zhongqiang Gong

wyk  于2024年4月9日周二 16:56写道:

>
> 是的,分片比较大,有一万七千多个分片
>
> jm内存目前是2g,我调整到4g之后还是会有这么问题,我在想如果我一直调整jm内存,后面增量的时候内存会有所浪费,在flink官网上找到了flink堆内存的相关参数,但是对这个不太了解,不知道具体该怎么调试合适,麻烦帮忙看一下如下图这些参数调整那个合适呢?
>
> flink官网地址为:
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/memory/mem_setup_jobmanager/
>
>
>
>
>   ComponentConfiguration optionsDescription
> JVM Heap
> 
> jobmanager.memory.heap.size
> 
>  *JVM
> Heap* memory size for job manager.
> Off-heap Memory
> 
> jobmanager.memory.off-heap.size
> 
> *Off-heap* memory size for job manager. This option covers all off-heap
> memory usage including direct and native memory allocation.
> JVM metaspace
> 
> jobmanager.memory.jvm-metaspace.size
> 
>  Metaspace
> size of the Flink JVM process
> JVM Overhead jobmanager.memory.jvm-overhead.min
> 
> jobmanager.memory.jvm-overhead.max
> 
> jobmanager.memory.jvm-overhead.fraction
> 
>  Native
> memory reserved for other JVM overhead: e.g. thread stacks, code cache,
> garbage collection space etc, it is a capped fractionated component
> 
>  of
> the total process memory
> 
>
>
>
>
> 在 2024-04-09 11:28:57,"Shawn Huang"  写道:
>
>
> 从报错信息看,是由于JM的堆内存不够,可以尝试把JM内存调大,一种可能的原因是mysql表全量阶段分片较多,导致SourceEnumerator状态较大。
>
> Best,
> Shawn Huang
>
>
> wyk  于2024年4月8日周一 17:46写道:
>
>>
>>
>> 开发者们好:
>> flink版本1.14.5
>> flink-cdc版本 2.2.0
>>
>>  在使用flink-cdc-mysql采集全量的时候,全量阶段会做checkpoint,但是checkpoint的时候会出现oom问题,这个有什么办法吗?
>>具体报错如附件文本以及下图所示:
>>
>>
>>


Re: 1.19自定义数据源

2024-03-28 文章 gongzhongqiang
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


Re: 退订

2024-03-21 文章 gongzhongqiang
Hi,  scott

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh@flink.apache.org 
邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。

Best,
Zhongqiang Gong

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

己巳  于 2024年3月22日周五 10:21写道:

> 退订


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-21 文章 gongzhongqiang
Congrattulations! Thanks for the great work!


Best,
Zhongqiang Gong

Leonard Xu  于2024年3月20日周三 21:36写道:

> Hi devs and users,
>
> We are thrilled to announce that the donation of Flink CDC as a
> sub-project of Apache Flink has completed. We invite you to explore the new
> resources available:
>
> - GitHub Repository: https://github.com/apache/flink-cdc
> - Flink CDC Documentation:
> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>
> After Flink community accepted this donation[1], we have completed
> software copyright signing, code repo migration, code cleanup, website
> migration, CI migration and github issues migration etc.
> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, Qingsheng
> Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors for their
> contributions and help during this process!
>
>
> For all previous contributors: The contribution process has slightly
> changed to align with the main Flink project. To report bugs or suggest new
> features, please open tickets
> Apache Jira (https://issues.apache.org/jira).  Note that we will no
> longer accept GitHub issues for these purposes.
>
>
> Welcome to explore the new repository and documentation. Your feedback and
> contributions are invaluable as we continue to improve Flink CDC.
>
> Thanks everyone for your support and happy exploring Flink CDC!
>
> Best,
> Leonard
> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 gongzhongqiang
Congrats! Thanks to everyone involved!

Best,
Zhongqiang Gong

Lincoln Lee  于2024年3月18日周一 16:27写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
> Best,
> Yun, Jing, Martijn and Lincoln
>


Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 文章 gongzhongqiang
hi, 东树
   隐藏sql中的敏感信息,这个需要外部的大数据平台来做。
比如:StreamPark 的变量管理,可以提前维护好配置信息,编写sql时引用配置,由平台提交至flink时解析sql并替换变量。

Best,
Zhongqiang Gong

杨东树  于2024年3月10日周日 21:50写道:

> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
> CREATE TABLE wordcount_sink (
>  word String,
>  cnt BIGINT,
>  primary key (word) not enforced
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/flink',
>  'username' = 'root',
>  'password' = '123456',
>  'table-name' = 'wordcount_sink'
> );