Re:Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-21 文章 Xuyang
Hi, 
> 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
对,具体可以参考下这个内部实现的算子[1]


> 新的sink 
> v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
>  - context.timestamp()得到sink延迟呢?
应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。


[1] 
https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314




--

Best!
Xuyang





在 2024-02-21 15:17:49,"casel.chen"  写道:
>感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
>我看新的sink 
>v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
> - context.timestamp()得到sink延迟呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-02-21 09:41:37,"Xuyang"  写道:
>>Hi, chen. 
>>可以试一下在sink function的invoke函数中使用:
>>
>>
>>@Override
>>public void invoke(RowData row, Context context) throws Exception {
>>context.currentProcessingTime(); 
>>context.currentWatermark(); 
>>...
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>
>>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>>
>>>Best,
>>>Feng
>>>
>>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>>
 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


 public class XxxSinkFunction extends RichSinkFunction implements
 CheckpointedFunction, CheckpointListener {


 @Override
 public synchronized void invoke(RowData rowData, Context context)
 throws IOException {
//  这里想从rowData中获取event time和watermark值,如何实现呢?
 }
 }


 例如source table如下定义


 CREATE TEMPORARY TABLE source_table(
   username varchar,
   click_url varchar,
   eventtime varchar,

   ts AS TO_TIMESTAMP(eventtime),
   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
 ) with (
   'connector'='kafka',
   ...

 );


 CREATE TEMPORARY TABLE sink_table(
   username varchar,
   click_url varchar,
   eventtime varchar
 ) with (
   'connector'='xxx',
   ...
 );
 insert into sink_table select username,click_url,eventtime from
 source_table;


Re: 退订

2024-02-21 文章 Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅来自 
user-zh@flink.apache.org
邮件组的邮件。

Best,
Zhanghao Chen

From: 曹明勤 
Sent: Thursday, February 22, 2024 9:42
To: user-zh@flink.apache.org 
Subject: 退订

退订


退订

2024-02-21 文章 曹明勤
退订

flink sql作业如何统计端到端延迟

2024-02-20 文章 casel.chen
flink sql作业从kafka消费mysql过来的canal 
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - 
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang"  写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>@Override
>public void invoke(RowData row, Context context) throws Exception {
>context.currentProcessingTime(); 
>context.currentWatermark(); 
>...
>}
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>> @Override
>>> public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>> }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>> CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;


Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Xuyang
Hi, chen. 
可以试一下在sink function的invoke函数中使用:


@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime(); 
context.currentWatermark(); 
...
}







--

Best!
Xuyang





在 2024-02-20 19:38:44,"Feng Jin"  写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;


Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Feng Jin
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.

Best,
Feng

On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:

> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>
>
> public class XxxSinkFunction extends RichSinkFunction implements
> CheckpointedFunction, CheckpointListener {
>
>
> @Override
> public synchronized void invoke(RowData rowData, Context context)
> throws IOException {
>//  这里想从rowData中获取event time和watermark值,如何实现呢?
> }
> }
>
>
> 例如source table如下定义
>
>
> CREATE TEMPORARY TABLE source_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar,
>
>   ts AS TO_TIMESTAMP(eventtime),
>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
> ) with (
>   'connector'='kafka',
>   ...
>
> );
>
>
> CREATE TEMPORARY TABLE sink_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar
> ) with (
>   'connector'='xxx',
>   ...
> );
> insert into sink_table select username,click_url,eventtime from
> source_table;


退订

2024-02-20 文章 任香帅
退订

flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


public class XxxSinkFunction extends RichSinkFunction implements 
CheckpointedFunction, CheckpointListener {


@Override
public synchronized void invoke(RowData rowData, Context context) throws 
IOException {
   //  这里想从rowData中获取event time和watermark值,如何实现呢?
}
}


例如source table如下定义


CREATE TEMPORARY TABLE source_table(
  username varchar,
  click_url varchar,
  eventtime varchar,
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
) with (
  'connector'='kafka',
  ...

);


CREATE TEMPORARY TABLE sink_table(
  username varchar,
  click_url varchar,
  eventtime varchar
) with (
  'connector'='xxx',
  ...
);
insert into sink_table select username,click_url,eventtime from source_table;

退订

2024-02-19 文章 曹明勤
退订

flink作业实时数据质量监控告警要如何实现?

2024-02-08 文章 casel.chen
我们在使用flink搭建实时数仓,想知道flink作业是如何做数据质量监控告警的?包括数据及时性、完整性、一致性、准确性等
调研了spark streaming有amazon deequ和apache 
griffin框架来实现,想知道flink作业有没有类似的DQC框架?最好是对原有作业无侵入或者少侵入。
如果没有的话,实时数据质量这块一般是如何实现的呢?
如果每个生产作业都要单独配置一个DQC作业是不是代价太高了?有没有通过metrics暴露数据质量信息的呢?


下面是deequ使用的示例,检查每个微批数据是否满足规则要求。我们也有类似的数据质量检查需求


VerificationSuite().onData(df)
  .addCheck(Check(CheckLevel.Error, "this a unit test")
.hasSize(_ == 5) // 判断数据量是否是5条
.isComplete("id") // 判断该列是否全部不为空
.isUnique("id") // 判断该字段是否是唯一
.isComplete("productName") // 判断该字段全部不为空
.isContainedIn("priority", Array("high", "low")) // 该字段仅仅包含这两个字段
.isNonNegative("numViews") //该字段不包含负数
.containsURL("description", _ >= 0.5) // 包含url的记录是否超过0.5
.hasApproxQuantile("numViews", 0.5, _ <= 10)
  )
  .run()

[ANNOUNCE] Apache flink-connector-kafka v3.1.0 released

2024-02-07 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of
Apache flink-connector-kafka v3.1.0. This release is compatible with
Apache Flink 1.17 and 1.18.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353135

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Release Manager


flink cdc整库同步大小表造成数据倾斜问题

2024-02-06 文章 casel.chen
使用flink cdc 3.0 
yaml作业进行mysql到doris整库同步时发现有数据倾斜发生,大的TM要处理180G数据,小的TM只有30G数据,上游有的大表流量很大,而小表几乎没有流量,有什么办法可以避免发生数据倾斜问题么?

Re: Flink任务链接信息审计获取

2024-02-03 文章 Feng Jin
我理解应该是平台统一配置在 flink-conf.yaml  即可, 不需要用户单独配置相关参数.


Best,
Feng

On Sun, Feb 4, 2024 at 11:19 AM 阿华田  wrote:

> 看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2024年02月2日 19:38,Feng Jin 写道:
> hi,
>
> 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
> Source 和 Sink 拿到血缘信息。
>
> [1]
>
> https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java
>
> Best,
> Feng
>
>
> On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:
>
>
>
>
> 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
> ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


回复: Flink任务链接信息审计获取

2024-02-03 文章 阿华田
看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月2日 19:38,Feng Jin 写道:
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:



打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




回复: Flink任务链接信息审计获取

2024-02-03 文章 阿华田
好的 感谢 我研究一下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月2日 19:38,Feng Jin 写道:
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:



打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




Re: Flink任务链接信息审计获取

2024-02-02 文章 Feng Jin
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:

>
>
> 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
> ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Flink任务链接信息审计获取

2024-02-02 文章 阿华田


打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
 ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
我试了下,当我显示的设置env.setRuntimeMode(RuntimeExecutionMode.BATCH); 
就不会进行checkpoint了,否则是可以。

> 2024年2月2日 17:20,ha.fen...@aisino.com 写道:
> 
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>env.setStateBackend(new FsStateBackend("file:\\d:\\abc"));
> 
> 
> 发件人: jinzhuguang
> 发送时间: 2024-02-02 17:16
> 收件人: user-zh
> 主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
> 你是在batch模式下手动开启了checkpoint吗
> 
>> 2024年2月2日 17:11,ha.fen...@aisino.com 写道:
>> 
>> 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。
>> 
>> 发件人: jinzhuguang
>> 发送时间: 2024-02-02 16:47
>> 收件人: user-zh
>> 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
>> Flink 1.16.0
>> 
>> 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。
>> 
>> interface StatefulSinkWriter extends 
>> SinkWriter {
>>   /**
>>* @return The writer's state.
>>* @throws IOException if fail to snapshot writer's state.
>>*/
>>   List snapshotState(long checkpointId) throws IOException;
>>   }
>> 
>> 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。
>> 
>> 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?
>> 
>> 恳请各位大佬赐教
> 



Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 tanjialiang
按我的理解,streaming模式去读是允许checkpoint的(具体看各个connector的checkpoint逻辑),batch模式是一个阶段一个阶段的跑的,上一个task跑完的结果会放到磁盘等待下一个task拉取,task失败了就重新拉取上一个task的结果重新跑(没有看源码里具体实现细节,纯属个人的猜测,有懂行的大佬们可以纠正)


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年2月2日 17:21 |
| 收件人 | user-zh |
| 主题 | Re: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job 
restart的情况避免从头读取数据 |
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("file:\\d:\\abc"));


发件人: jinzhuguang
发送时间: 2024-02-02 17:16
收件人: user-zh
主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
你是在batch模式下手动开启了checkpoint吗

2024年2月2日 17:11,ha.fen...@aisino.com 写道:

今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。

发件人: jinzhuguang
发送时间: 2024-02-02 16:47
收件人: user-zh
主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
Flink 1.16.0

我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。

interface StatefulSinkWriter extends SinkWriter {
/**
* @return The writer's state.
* @throws IOException if fail to snapshot writer's state.
*/
List snapshotState(long checkpointId) throws IOException;
}

然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。

如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?

恳请各位大佬赐教



Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
你是在batch模式下手动开启了checkpoint吗

> 2024年2月2日 17:11,ha.fen...@aisino.com 写道:
> 
> 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。
> 
> 发件人: jinzhuguang
> 发送时间: 2024-02-02 16:47
> 收件人: user-zh
> 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
> Flink 1.16.0
> 
> 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。
> 
> interface StatefulSinkWriter extends SinkWriter 
> {
>/**
> * @return The writer's state.
> * @throws IOException if fail to snapshot writer's state.
> */
>List snapshotState(long checkpointId) throws IOException;
>}
> 
> 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。
> 
> 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?
> 
> 恳请各位大佬赐教



Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
Flink 1.16.0

我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。

interface StatefulSinkWriter extends SinkWriter {
/**
 * @return The writer's state.
 * @throws IOException if fail to snapshot writer's state.
 */
List snapshotState(long checkpointId) throws IOException;
}

然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。

如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?

恳请各位大佬赐教


[ANNOUNCE] Community over Code EU 2024 Travel Assistance Applications now open!

2024-01-27 文章 Martijn Visser
Hi everyone,

The Apache Software Foundation is organizing another Community over Code
event, where a wide variety of speakers will be speaking. You can find all
the details at https://eu.communityovercode.org/

Within the ASF, there is a so-called Travel Assistance Committee (TAC).
This committee exists to help those that would like to attend Community
over Code events, but are unable to do so for financial reasons. I'm hoping
that we'll have a wide variety of Flink community members over there!

All the details and more information can be found in the message below.

Best regards,

Martijn Visser

-- Forwarded message -
From: Christofer Dutz 
Date: Sat, Jan 27, 2024 at 5:31 AM
Subject: Community over Code EU 2024 Travel Assistance Applications now
open!


Hi @,

The Travel Assistance Committee (TAC) are pleased to announce that
travel assistance applications for Community over Code EU 2024 are now
open!

We will be supporting Community over Code EU 2024, Bratislava,
Slovakia, June 3th - 5th, 2024.

TAC exists to help those that would like to attend Community over Code
events, but are unable to do so for financial reasons. For more info
on this years applications and qualifying criteria, please visit the
TAC website at < https://tac.apache.org/ >. Applications are already
open on https://tac-apply.apache.org/, so don't delay!

The Apache Travel Assistance Committee will only be accepting
applications from those people that are able to attend the full event.

Important: Applications close on Friday, March 1st, 2024.

Applicants have until the the closing date above to submit their
applications (which should contain as much supporting material as
required to efficiently and accurately process their request), this
will enable TAC to announce successful applications shortly
afterwards.

As usual, TAC expects to deal with a range of applications from a
diverse range of backgrounds; therefore, we encourage (as always)
anyone thinking about sending in an application to do so ASAP.

We look forward to greeting many of you in Bratislava, Slovakia in June,
2024!

Kind Regards,

Chris

(On behalf of the Travel Assistance Committee)

When replying, please reply to travel-assista...@apache.org


Re:回复: flink ui 算子数据展示一直loading...

2024-01-25 文章 Xuyang
Hi, 
手动curl 有问题的metric的接口,出来的数据正常吗? JM log里有发现什么异常么?



--

Best!
Xuyang





在 2024-01-26 11:51:53,"阿华田"  写道:
>这个维度都排查了  都正常
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2024年01月23日 21:57,Feng Jin 写道:
>可以尝试着下面几种方式确认下原因:
>
>
>1.
>
>打开浏览器开发者模式,看是否因为请求某个接口卡住
>2.
>
>查看下 JobManager 的 GC 情况,是否频繁 FullGC
>3.
>
>查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.
>
>
>Best,
>Feng
>
>
>On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:
>
>
>
>如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
>阿华田
>a15733178...@163.com
>
>
>签名由 网易邮箱大师  定制
>
>


回复: flink ui 算子数据展示一直loading...

2024-01-25 文章 阿华田
这个维度都排查了  都正常


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年01月23日 21:57,Feng Jin 写道:
可以尝试着下面几种方式确认下原因:


1.

打开浏览器开发者模式,看是否因为请求某个接口卡住
2.

查看下 JobManager 的 GC 情况,是否频繁 FullGC
3.

查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.


Best,
Feng


On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:



如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
阿华田
a15733178...@163.com


签名由 网易邮箱大师  定制




Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-25 文章 Jing Ge
Hi folks,

The bug has been fixed and PR at docker-library/official-images has been
merged. The official images are available now.

Best regards,
Jing

On Mon, Jan 22, 2024 at 11:39 AM Jing Ge  wrote:

> Hi folks,
>
> I am still working on the official images because of the issue
> https://issues.apache.org/jira/browse/FLINK-34165. Images under
> apache/flink are available.
>
> Best regards,
> Jing
>
> On Sun, Jan 21, 2024 at 11:06 PM Jing Ge  wrote:
>
>> Thanks Leonard for the feedback! Also thanks @Jark Wu  
>> @Chesnay
>> Schepler  and each and everyone who worked closely
>> with me for this release. We made it together!
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jan 21, 2024 at 9:25 AM Leonard Xu  wrote:
>>
>>> Thanks Jing for driving the release, nice work!
>>>
>>> Thanks all who involved this release!
>>>
>>> Best,
>>> Leonard
>>>
>>> > 2024年1月20日 上午12:01,Jing Ge  写道:
>>> >
>>> > The Apache Flink community is very happy to announce the release of
>>> Apache
>>> > Flink 1.18.1, which is the first bugfix release for the Apache Flink
>>> 1.18
>>> > series.
>>> >
>>> > Apache Flink® is an open-source stream processing framework for
>>> > distributed, high-performing, always-available, and accurate data
>>> streaming
>>> > applications.
>>> >
>>> > The release is available for download at:
>>> > https://flink.apache.org/downloads.html
>>> >
>>> > Please check out the release blog post for an overview of the
>>> improvements
>>> > for this bugfix release:
>>> >
>>> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
>>> >
>>> > Please note: Users that have state compression should not migrate to
>>> 1.18.1
>>> > (nor 1.18.0) due to a critical bug that could lead to data loss. Please
>>> > refer to FLINK-34063 for more information.
>>> >
>>> > The full release notes are available in Jira:
>>> >
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
>>> >
>>> > We would like to thank all contributors of the Apache Flink community
>>> who
>>> > made this release possible! Special thanks to @Qingsheng Ren @Leonard
>>> Xu
>>> > @Xintong Song @Matthias Pohl @Martijn Visser for the support during
>>> this
>>> > release.
>>> >
>>> > A Jira task series based on the Flink release wiki has been created for
>>> > 1.18.1 release. Tasks that need to be done by PMC have been explicitly
>>> > created separately. It will be convenient for the release manager to
>>> reach
>>> > out to PMC for those tasks. Any future patch release could consider
>>> cloning
>>> > it and follow the standard release process.
>>> > https://issues.apache.org/jira/browse/FLINK-33824
>>> >
>>> > Feel free to reach out to the release managers (or respond to this
>>> thread)
>>> > with feedback on the release process. Our goal is to constantly
>>> improve the
>>> > release process. Feedback on what could be improved or things that
>>> didn't
>>> > go so well are appreciated.
>>> >
>>> > Regards,
>>> > Jing
>>>
>>>


Re: 实时数仓场景落地问题

2024-01-23 文章 xiaohui zhang
实时数仓落地建议先动手做一两个场景真实应用起来,见过好几个项目一开始目标定得过大,实时数仓、流批一体、数据管控啥的都规划进去,结果项目陷入无尽的扯皮,架构设计也如空中楼阁。
实践过程中不要太过于向已有数仓分层模型靠拢,从源系统直接拼接宽表到dws层就足以应付大部分需求了。下游应用再用MPP来满足业务层的实时聚合、BI需求。
等立了几个烟囱,自己项目的实时数仓怎么做也基本有了思路


Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-23 文章 yuanfeng hu


> 2024年1月18日 14:59,fufu  写道:
> 
> 看hdfs上shard文件比chk-xxx要大很多。
> 
> 
> 
> 在 2024-01-18 14:49:14,"fufu"  写道:
> 
> 是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
> UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
> 
> 在 2024-01-18 10:56:51,"Zakelly Lan"  写道:
> 
>> 你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>> TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>> 
>> On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>> 
>>> 
>>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>>> 请社区指导下,或者有没有别的解决方案?感谢社区!
Manifest  
文件是会一直增大的,flink没有提供参数给你设置,如果需要设置的话需要实现ConfigurableRocksDBOptionsFactory,在我们的实践中合理设置manifest大小是对checkpoint大小有作用的

关于 flink Async io checkpoint restore

2024-01-23 文章 zhhui yan
HI All
flink 1.18.0 jdk 17 使用异步IO 失败后无法恢复,一直报序列化问题;
我调整使用 string 类型和bytes 都不能够恢复
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(
DefaultOperatorStateBackendBuilder.java:88)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: Corrupt stream, found tag: 93
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
.deserialize(StreamElementSerializer.java:201)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
.deserialize(StreamElementSerializer.java:43)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation
.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:231)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(
OperatorStateRestoreOperation.java:201)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(
DefaultOperatorStateBackendBuilder.java:85)
... 17 more
-- 
best with you!
zhhuiyan


Re: flink ui 算子数据展示一直loading...

2024-01-23 文章 Feng Jin
可以尝试着下面几种方式确认下原因:


   1.

   打开浏览器开发者模式,看是否因为请求某个接口卡住
   2.

   查看下 JobManager 的 GC 情况,是否频繁 FullGC
   3.

   查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.


Best,
Feng


On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:

>
>
> 如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
> 阿华田
> a15733178...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


flink ui 算子数据展示一直loading...

2024-01-23 文章 阿华田


如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-22 文章 Jing Ge
Hi folks,

I am still working on the official images because of the issue
https://issues.apache.org/jira/browse/FLINK-34165. Images under
apache/flink are
available.

Best regards,
Jing

On Sun, Jan 21, 2024 at 11:06 PM Jing Ge  wrote:

> Thanks Leonard for the feedback! Also thanks @Jark Wu  
> @Chesnay
> Schepler  and each and everyone who worked closely
> with me for this release. We made it together!
>
> Best regards,
> Jing
>
> On Sun, Jan 21, 2024 at 9:25 AM Leonard Xu  wrote:
>
>> Thanks Jing for driving the release, nice work!
>>
>> Thanks all who involved this release!
>>
>> Best,
>> Leonard
>>
>> > 2024年1月20日 上午12:01,Jing Ge  写道:
>> >
>> > The Apache Flink community is very happy to announce the release of
>> Apache
>> > Flink 1.18.1, which is the first bugfix release for the Apache Flink
>> 1.18
>> > series.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> improvements
>> > for this bugfix release:
>> >
>> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
>> >
>> > Please note: Users that have state compression should not migrate to
>> 1.18.1
>> > (nor 1.18.0) due to a critical bug that could lead to data loss. Please
>> > refer to FLINK-34063 for more information.
>> >
>> > The full release notes are available in Jira:
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
>> >
>> > We would like to thank all contributors of the Apache Flink community
>> who
>> > made this release possible! Special thanks to @Qingsheng Ren @Leonard Xu
>> > @Xintong Song @Matthias Pohl @Martijn Visser for the support during this
>> > release.
>> >
>> > A Jira task series based on the Flink release wiki has been created for
>> > 1.18.1 release. Tasks that need to be done by PMC have been explicitly
>> > created separately. It will be convenient for the release manager to
>> reach
>> > out to PMC for those tasks. Any future patch release could consider
>> cloning
>> > it and follow the standard release process.
>> > https://issues.apache.org/jira/browse/FLINK-33824
>> >
>> > Feel free to reach out to the release managers (or respond to this
>> thread)
>> > with feedback on the release process. Our goal is to constantly improve
>> the
>> > release process. Feedback on what could be improved or things that
>> didn't
>> > go so well are appreciated.
>> >
>> > Regards,
>> > Jing
>>
>>


RE: Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 文章 Jiabao Sun
Hi,

ResumeToken[1] can be considered globally unique[2].

Best,
Jiabao

[1] https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
[2] 
https://img.alicdn.com/imgextra/i4/O1CN010e81SP1vkgoyL0nhd_!!66211-0-tps-2468-1360.jpg

On 2024/01/22 09:36:42 "casel.chen" wrote:
> 
> 
> 
> V1版本依赖于DebeziumSourceFunction,后者依赖于DebeziumEngine产生changelog
> V2版本虽然依赖了 flink-connector-debezium 但没有用到debezium内部类
> 
> 
> 另外有一个问题:mongodb change stream断点续传用的resumeToken是像mysql binlog offset一样全局唯一么?
> 如果数据源是像kafka一样每个分片有binlog offset的话,
> 是不是要在对应xxxOffset类中要定义一个Map类型的offsetField 
> (类似mongodb对应ChangeStreamOffset中的resumeTokenField)? 
> 当前mongodb中定义的是Json String类型
> 
> 在 2024-01-22 11:03:55,"Jiabao Sun"  写道:
> >Hi,
> >
> >Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
> >Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。
> >
> >Best,
> >Jiabao
> >
> >[1] https://github.com/mongodb/mongo-kafka
> >
> >
> >On 2024/01/22 02:57:38 "casel.chen" wrote:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> Flink CDC MongoDB connector 还是基于debezium实现的
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >> >Hi,
> >> >
> >> >可以参考 Flink CDC MongoDB connector 的实现。
> >> >
> >> >Best,
> >> >Jiabao
> >> >
> >> >
> >> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> >> 
> 

Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 文章 casel.chen



V1版本依赖于DebeziumSourceFunction,后者依赖于DebeziumEngine产生changelog
V2版本虽然依赖了 flink-connector-debezium 但没有用到debezium内部类


另外有一个问题:mongodb change stream断点续传用的resumeToken是像mysql binlog offset一样全局唯一么?
如果数据源是像kafka一样每个分片有binlog offset的话,
是不是要在对应xxxOffset类中要定义一个Map类型的offsetField 
(类似mongodb对应ChangeStreamOffset中的resumeTokenField)? 
当前mongodb中定义的是Json String类型

在 2024-01-22 11:03:55,"Jiabao Sun"  写道:
>Hi,
>
>Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
>Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。
>
>Best,
>Jiabao
>
>[1] https://github.com/mongodb/mongo-kafka
>
>
>On 2024/01/22 02:57:38 "casel.chen" wrote:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Flink CDC MongoDB connector 还是基于debezium实现的
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
>> >Hi,
>> >
>> >可以参考 Flink CDC MongoDB connector 的实现。
>> >
>> >Best,
>> >Jiabao
>> >
>> >
>> >On 2024/01/22 02:06:37 "casel.chen" wrote:
>> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
>> >> 3.x自行开发,查了一下现有大部分flink cdc source 
>> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
>> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
>> 


Re:RE: Re:RE: binlog文件丢失问题

2024-01-21 文章 wyk



您好:
  我确认我们两台mysql备库都开启了gtid选项,并且该问题我们进行了复现,复现步骤如下:
flink版本 1.14.5
flink-connector-mysql-cdc版本  2.2.0
mysql版本 5.6.0


1.准备两台备库,并且binlog文件名相差很远没有交集
2.采集第一台备库,等待数据正常写入后,停止该cdc采集任务,正常保存savepoint
3.修改采集mysql的配置信息为备库2,并且将flink任务正常从savepoint启动,就会出现上述反馈的问题
















在 2024-01-19 20:36:10,"Jiabao Sun"  写道:
>Hi,
>
>日志中有包含 GTID 的内容吗?
>用 SHOW VARIABLES LIKE 'gtid_mode’; 确认下是否开启了GTID呢?
>
>Best,
>Jiabao
>
>
>On 2024/01/19 09:36:38 wyk wrote:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 抱歉,具体报错和代码如下:
>> 
>> 
>> 报错部分:
>> Caused by: java.lang.IllegalStateException: The connector is trying to read 
>> binlog starting at 
>> Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1705645599953,db=,server_id=0,file=mysql_bin.007132,pos=729790304,row=0},
>>  but this is no longer available on the server. Reconfigure the connector to 
>> use a snapshot when needed.
>> at 
>> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
>> at 
>> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:112)
>> at 
>> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
>> at 
>> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
>> at 
>> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:170)
>> at 
>> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:75)
>> at 
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at 
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> ... 6 more
>> 
>> 
>> 
>> 
>> 代码部分: 
>> if (!isBinlogAvailable(mySqlOffsetContext)) {
>> throw new IllegalStateException(
>> "The connector is trying to read binlog starting at "
>> + mySqlOffsetContext.getSourceInfo()
>> + ", but this is no longer "
>> + "available on the server. Reconfigure the connector to 
>> use a snapshot when needed.");
>> }
>> 
>> 在 2024-01-19 17:33:03,"Jiabao Sun"  写道:
>> >Hi,
>> >
>> >你的图挂了,可以贴一下图床链接或者直接贴一下代码。
>> >
>> >Best,
>> >Jiabao
>> >
>> >
>> >On 2024/01/19 09:16:55 wyk wrote:
>> >> 
>> >> 
>> >> 各位大佬好:
>> >> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
>> >> 
>> >> 
>> >> 问题描述:
>> >> 场景: 公司mysql有两个备库: 备库1和备库2。
>> >> 1. 现在备库1需要下线,需要将任务迁移至备库2
>> >> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
>> >> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
>> >> 
>> >> 
>> >> 注意: 备库一个备库二的gtid是保持一致的
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 图一:
>> >> 
>> >> 
>> >> 图二:
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> 


RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 Jiabao Sun
Hi,

Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。

Best,
Jiabao

[1] https://github.com/mongodb/mongo-kafka


On 2024/01/22 02:57:38 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Flink CDC MongoDB connector 还是基于debezium实现的
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >Hi,
> >
> >可以参考 Flink CDC MongoDB connector 的实现。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> 

Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 casel.chen









Flink CDC MongoDB connector 还是基于debezium实现的








在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
>Hi,
>
>可以参考 Flink CDC MongoDB connector 的实现。
>
>Best,
>Jiabao
>
>
>On 2024/01/22 02:06:37 "casel.chen" wrote:
>> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
>> 3.x自行开发,查了一下现有大部分flink cdc source 
>> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
>> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!


RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 Jiabao Sun
Hi,

可以参考 Flink CDC MongoDB connector 的实现。

Best,
Jiabao


On 2024/01/22 02:06:37 "casel.chen" wrote:
> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> 3.x自行开发,查了一下现有大部分flink cdc source 
> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!

如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 casel.chen
现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
3.x自行开发,查了一下现有大部分flink cdc source 
connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
 snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!

Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-21 文章 Jing Ge
Thanks Leonard for the feedback! Also thanks @Jark Wu
 @Chesnay
Schepler  and each and everyone who worked closely with
me for this release. We made it together!

Best regards,
Jing

On Sun, Jan 21, 2024 at 9:25 AM Leonard Xu  wrote:

> Thanks Jing for driving the release, nice work!
>
> Thanks all who involved this release!
>
> Best,
> Leonard
>
> > 2024年1月20日 上午12:01,Jing Ge  写道:
> >
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.18.1, which is the first bugfix release for the Apache Flink 1.18
> > series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> >
> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
> >
> > Please note: Users that have state compression should not migrate to
> 1.18.1
> > (nor 1.18.0) due to a critical bug that could lead to data loss. Please
> > refer to FLINK-34063 for more information.
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible! Special thanks to @Qingsheng Ren @Leonard Xu
> > @Xintong Song @Matthias Pohl @Martijn Visser for the support during this
> > release.
> >
> > A Jira task series based on the Flink release wiki has been created for
> > 1.18.1 release. Tasks that need to be done by PMC have been explicitly
> > created separately. It will be convenient for the release manager to
> reach
> > out to PMC for those tasks. Any future patch release could consider
> cloning
> > it and follow the standard release process.
> > https://issues.apache.org/jira/browse/FLINK-33824
> >
> > Feel free to reach out to the release managers (or respond to this
> thread)
> > with feedback on the release process. Our goal is to constantly improve
> the
> > release process. Feedback on what could be improved or things that didn't
> > go so well are appreciated.
> >
> > Regards,
> > Jing
>
>


Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-21 文章 Leonard Xu
Thanks Jing for driving the release, nice work!

Thanks all who involved this release!

Best,
Leonard

> 2024年1月20日 上午12:01,Jing Ge  写道:
> 
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.18.1, which is the first bugfix release for the Apache Flink 1.18
> series.
> 
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
> 
> Please note: Users that have state compression should not migrate to 1.18.1
> (nor 1.18.0) due to a critical bug that could lead to data loss. Please
> refer to FLINK-34063 for more information.
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
> 
> We would like to thank all contributors of the Apache Flink community who
> made this release possible! Special thanks to @Qingsheng Ren @Leonard Xu
> @Xintong Song @Matthias Pohl @Martijn Visser for the support during this
> release.
> 
> A Jira task series based on the Flink release wiki has been created for
> 1.18.1 release. Tasks that need to be done by PMC have been explicitly
> created separately. It will be convenient for the release manager to reach
> out to PMC for those tasks. Any future patch release could consider cloning
> it and follow the standard release process.
> https://issues.apache.org/jira/browse/FLINK-33824
> 
> Feel free to reach out to the release managers (or respond to this thread)
> with feedback on the release process. Our goal is to constantly improve the
> release process. Feedback on what could be improved or things that didn't
> go so well are appreciated.
> 
> Regards,
> Jing



[ANNOUNCE] Apache Flink 1.18.1 released

2024-01-19 文章 Jing Ge
The Apache Flink community is very happy to announce the release of Apache
Flink 1.18.1, which is the first bugfix release for the Apache Flink 1.18
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/

Please note: Users that have state compression should not migrate to 1.18.1
(nor 1.18.0) due to a critical bug that could lead to data loss. Please
refer to FLINK-34063 for more information.

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640

We would like to thank all contributors of the Apache Flink community who
made this release possible! Special thanks to @Qingsheng Ren @Leonard Xu
 @Xintong Song @Matthias Pohl @Martijn Visser for the support during this
release.

A Jira task series based on the Flink release wiki has been created for
1.18.1 release. Tasks that need to be done by PMC have been explicitly
created separately. It will be convenient for the release manager to reach
out to PMC for those tasks. Any future patch release could consider cloning
it and follow the standard release process.
https://issues.apache.org/jira/browse/FLINK-33824

Feel free to reach out to the release managers (or respond to this thread)
with feedback on the release process. Our goal is to constantly improve the
release process. Feedback on what could be improved or things that didn't
go so well are appreciated.

Regards,
Jing


RE: Re:RE: binlog文件丢失问题

2024-01-19 文章 Jiabao Sun
Hi,

日志中有包含 GTID 的内容吗?
用 SHOW VARIABLES LIKE 'gtid_mode’; 确认下是否开启了GTID呢?

Best,
Jiabao


On 2024/01/19 09:36:38 wyk wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 抱歉,具体报错和代码如下:
> 
> 
> 报错部分:
> Caused by: java.lang.IllegalStateException: The connector is trying to read 
> binlog starting at 
> Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1705645599953,db=,server_id=0,file=mysql_bin.007132,pos=729790304,row=0},
>  but this is no longer available on the server. Reconfigure the connector to 
> use a snapshot when needed.
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:112)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:170)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:75)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 
> 代码部分: 
> if (!isBinlogAvailable(mySqlOffsetContext)) {
> throw new IllegalStateException(
> "The connector is trying to read binlog starting at "
> + mySqlOffsetContext.getSourceInfo()
> + ", but this is no longer "
> + "available on the server. Reconfigure the connector to 
> use a snapshot when needed.");
> }
> 
> 在 2024-01-19 17:33:03,"Jiabao Sun"  写道:
> >Hi,
> >
> >你的图挂了,可以贴一下图床链接或者直接贴一下代码。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/19 09:16:55 wyk wrote:
> >> 
> >> 
> >> 各位大佬好:
> >> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> >> 
> >> 
> >> 问题描述:
> >> 场景: 公司mysql有两个备库: 备库1和备库2。
> >> 1. 现在备库1需要下线,需要将任务迁移至备库2
> >> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> >> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> >> 
> >> 
> >> 
> >> 
> >> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> >> 
> >> 
> >> 注意: 备库一个备库二的gtid是保持一致的
> >> 
> >> 
> >> 
> >> 
> >> 图一:
> >> 
> >> 
> >> 图二:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> 

Re:RE: binlog文件丢失问题

2024-01-19 文章 wyk









抱歉,具体报错和代码如下:


报错部分:
Caused by: java.lang.IllegalStateException: The connector is trying to read 
binlog starting at 
Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1705645599953,db=,server_id=0,file=mysql_bin.007132,pos=729790304,row=0},
 but this is no longer available on the server. Reconfigure the connector to 
use a snapshot when needed.
at 
com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
at 
com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:112)
at 
com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
at 
com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:170)
at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:75)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more




代码部分: 
if (!isBinlogAvailable(mySqlOffsetContext)) {
throw new IllegalStateException(
"The connector is trying to read binlog starting at "
+ mySqlOffsetContext.getSourceInfo()
+ ", but this is no longer "
+ "available on the server. Reconfigure the connector to 
use a snapshot when needed.");
}

在 2024-01-19 17:33:03,"Jiabao Sun"  写道:
>Hi,
>
>你的图挂了,可以贴一下图床链接或者直接贴一下代码。
>
>Best,
>Jiabao
>
>
>On 2024/01/19 09:16:55 wyk wrote:
>> 
>> 
>> 各位大佬好:
>> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
>> 
>> 
>> 问题描述:
>> 场景: 公司mysql有两个备库: 备库1和备库2。
>> 1. 现在备库1需要下线,需要将任务迁移至备库2
>> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
>> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
>> 
>> 
>> 
>> 
>> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
>> 
>> 
>> 注意: 备库一个备库二的gtid是保持一致的
>> 
>> 
>> 
>> 
>> 图一:
>> 
>> 
>> 图二:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 


RE: binlog文件丢失问题

2024-01-19 文章 Jiabao Sun
Hi,

你的图挂了,可以贴一下图床链接或者直接贴一下代码。

Best,
Jiabao


On 2024/01/19 09:16:55 wyk wrote:
> 
> 
> 各位大佬好:
> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> 
> 
> 问题描述:
> 场景: 公司mysql有两个备库: 备库1和备库2。
> 1. 现在备库1需要下线,需要将任务迁移至备库2
> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> 
> 
> 
> 
> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> 
> 
> 注意: 备库一个备库二的gtid是保持一致的
> 
> 
> 
> 
> 图一:
> 
> 
> 图二:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

binlog文件丢失问题

2024-01-19 文章 wyk


各位大佬好:
现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:


问题描述:
场景: 公司mysql有两个备库: 备库1和备库2。
1. 现在备库1需要下线,需要将任务迁移至备库2
2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如附件内图一
3.我根据报错找到对应代码(如附件内图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常




疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢


注意: 备库一个备库二的gtid是保持一致的

















binlog文件丢失问题

2024-01-19 文章 wyk


各位大佬好:
现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:


问题描述:
场景: 公司mysql有两个备库: 备库1和备库2。
1. 现在备库1需要下线,需要将任务迁移至备库2
2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常




疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢


注意: 备库一个备库二的gtid是保持一致的




图一:


图二:





















RE: Re:RE: RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

oracle cdc connector 已经接入增量快照读框架,动态加表也是可以统一去实现的。
可以去社区创建issue,也欢迎直接贡献。

Best,
Jiabao


On 2024/01/19 04:46:21 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-19 11:53:49,"Jiabao Sun"  写道:
> >Hi,
> >
> >Oracle CDC connector[1] 目前是不支持动态加表的。
> >
> >Best,
> >Jiabao
> >
> >[1] 
> >https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
> >
> >
> >On 2024/01/19 03:37:41 Jiabao Sun wrote:
> >> Hi,
> >> 
> >> 请提供一下 flink cdc 的版本,使用的什么连接器。
> >> 如果方便的话,也请提供一下日志。
> >> 另外,table 的正则表达式可以匹配到新增的表吗?
> >> 
> >> Best,
> >> Jiabao
> >> 
> >> [1] 
> >> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> >> 
> >> On 2024/01/19 03:27:22 王凯 wrote:
> >> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> >> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> >> > 
> >> > 
> >> > 王凯
> >> > 2813732...@qq.com
> >> > 
> >> > 
> >> > 
> >> > 
> 

Re:RE: RE: flink cdc动态加表不生效

2024-01-18 文章 casel.chen






想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?











在 2024-01-19 11:53:49,"Jiabao Sun"  写道:
>Hi,
>
>Oracle CDC connector[1] 目前是不支持动态加表的。
>
>Best,
>Jiabao
>
>[1] 
>https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
>
>
>On 2024/01/19 03:37:41 Jiabao Sun wrote:
>> Hi,
>> 
>> 请提供一下 flink cdc 的版本,使用的什么连接器。
>> 如果方便的话,也请提供一下日志。
>> 另外,table 的正则表达式可以匹配到新增的表吗?
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
>> 
>> On 2024/01/19 03:27:22 王凯 wrote:
>> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
>> > 参数,当从savepoint重启时,新添加的表的数据不能同步
>> > 
>> > 
>> > 王凯
>> > 2813732...@qq.com
>> > 
>> > 
>> > 
>> > 


RE: 退订

2024-01-18 文章 Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from u...@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
u...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/19 03:39:52 李乐 wrote:
> 退订

RE: RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

Oracle CDC connector[1] 目前是不支持动态加表的。

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html


On 2024/01/19 03:37:41 Jiabao Sun wrote:
> Hi,
> 
> 请提供一下 flink cdc 的版本,使用的什么连接器。
> 如果方便的话,也请提供一下日志。
> 另外,table 的正则表达式可以匹配到新增的表吗?
> 
> Best,
> Jiabao
> 
> [1] 
> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> 
> On 2024/01/19 03:27:22 王凯 wrote:
> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> > 
> > 
> > 王凯
> > 2813732...@qq.com
> > 
> > 
> > 
> > 

退订

2024-01-18 文章 李乐
退订

RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

请提供一下 flink cdc 的版本,使用的什么连接器。
如果方便的话,也请提供一下日志。
另外,table 的正则表达式可以匹配到新增的表吗?

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15

On 2024/01/19 03:27:22 王凯 wrote:
> 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> 参数,当从savepoint重启时,新添加的表的数据不能同步
> 
> 
> 王凯
> 2813732...@qq.com
> 
> 
> 
> 

Re: Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 Zakelly Lan
图挂了看不到,不然你把文字信息简单复制下来看看?
另外你的ProcessWindowFunction里是否会访问state,如果访问了,是否实现了clear方法?

On Thu, Jan 18, 2024 at 3:01 PM fufu  wrote:

> 看hdfs上shard文件比chk-xxx要大很多。
>
>
>
> 在 2024-01-18 14:49:14,"fufu"  写道:
>
> 是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink
> UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
>
> 在 2024-01-18 10:56:51,"Zakelly Lan"  写道:
>
> >你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
> >TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
> >
> >On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
> >
> >>
> >>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> >> https://blog.csdn.net/RL_LEEE/article/details/123864487
> ,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> >> 请社区指导下,或者有没有别的解决方案?感谢社区!
>


Re:Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
看hdfs上shard文件比chk-xxx要大很多。



在 2024-01-18 14:49:14,"fufu"  写道:

是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~

在 2024-01-18 10:56:51,"Zakelly Lan"  写道:

>你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>
>On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>
>>
>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~

在 2024-01-18 10:56:51,"Zakelly Lan"  写道:

>你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>
>On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>
>>
>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 Zakelly Lan
你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大

On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:

>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re: flink cdc 读取数据后面可以跟窗口函数吗

2024-01-17 文章 Hang Ruan
你好,

CDC Source 目前不支持窗口函数。

不过可以考虑通过非窗口聚合的方式实现类似的效果。具体方法为:

   1.

   使用DATE_FORMAT函数,将时间字段转换成分钟粒度的字符串,作为窗口值。
   2.

   根据窗口值进行GROUP BY聚合。

Best,
Hang

Xuyang  于2024年1月17日周三 19:34写道:

> Hi,
> Flink SQL中可以用Group Window[1]的方式来读完cdc数据后加窗口。
> 可以具体描述一下“一直不生效”的现象和SQL么?
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#selecting-group-window-start-and-end-timestamps-1
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2024-01-17 19:24:03,"2813732510" <2813732...@qq.com.INVALID> 写道:
> >flinkcdc读取binlog数据后面可以开窗吗,测试滑动窗口,聚合,一直不生效,是有什么特别的用法嘛
>


Re:flink cdc 读取数据后面可以跟窗口函数吗

2024-01-17 文章 Xuyang
Hi, 
Flink SQL中可以用Group Window[1]的方式来读完cdc数据后加窗口。
可以具体描述一下“一直不生效”的现象和SQL么?



[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#selecting-group-window-start-and-end-timestamps-1




--

Best!
Xuyang





在 2024-01-17 19:24:03,"2813732510" <2813732...@qq.com.INVALID> 写道:
>flinkcdc读取binlog数据后面可以开窗吗,测试滑动窗口,聚合,一直不生效,是有什么特别的用法嘛


RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
 请社区指导下,或者有没有别的解决方案?感谢社区!

Re: 退订

2024-01-15 文章 Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

yd c  于2024年1月15日周一 08:47写道:

> 退订


退订

2024-01-14 文章 yd c
退订

RE: 实时数仓场景落地问题

2024-01-14 文章 Jiabao Sun
Hi,

可以尝试使用 Flink CDC + Apache Paimon 去构建实时数仓。
目前 Paimon 已经支持使用 Flink CDC 将数据整库入湖,可以使用较小的成本进行实时入湖。
另外利用 Paimon partial update的特性,可以以较小的计算成本去构建 ADS 层宽表。
Paimon 也可以同时支持批式计算和流式计算,对于时效性和计算成本可以使用灵活的计算方式做平衡。

Best,
Jiabao


On 2024/01/14 12:57:29 海风 wrote:
> hello,公司里业务会拿一张t+1的离线数仓表名,经常是ads应用层的,问你可不可以做成实时表,大家有碰到这类需求嘛?我的理解现在虽然有实时数仓,或者流批一体这样探索,但是远没有到层级很深的ads层t+1离线表可能以较小的成本去实现实时化。
> 引申的问题是当前实时数仓已有较大规模的场景落地么?有哪些场景落地呢?落地的效果成本与效果大概是怎么样的呢?
> 
> 
> 

RE: 退订

2024-01-14 文章 Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org 
 if you want to
unsubscribe the mail from u...@flink.apache.org , 
and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 
 地址来取消订阅来自
u...@flink.apache.org  邮件组的邮件,你可以参考[1][2] 
管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/14 03:17:44 王春顺 wrote:
> 
> 退订

实时数仓场景落地问题

2024-01-14 文章 海风
hello,公司里业务会拿一张t+1的离线数仓表名,经常是ads应用层的,问你可不可以做成实时表,大家有碰到这类需求嘛?我的理解现在虽然有实时数仓,或者流批一体这样探索,但是远没有到层级很深的ads层t+1离线表可能以较小的成本去实现实时化。
引申的问题是当前实时数仓已有较大规模的场景落地么?有哪些场景落地呢?落地的效果成本与效果大概是怎么样的呢?




退订

2024-01-13 文章 王春顺

退订

回复:Re:Flink1.16版本java.lang.OutOfMemoryError: Metaspace

2024-01-12 文章 Summer_Gu
@Override
public void close() throws Exception {
 RuntimeContext runtimeContext = getRuntimeContext();
 runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent("dad", () -> {
if (null != connection) {
try {
connection.close();
 } catch (SQLException e) {
throw new RuntimeException(e);
 }
 }
 Enumeration drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
 Driver driver = drivers.nextElement();
try {
 DriverManager.deregisterDriver(driver);
 } catch (SQLException e) {
throw new RuntimeException(e);
 }
 }
 AbandonedConnectionCleanupThread.uncheckedShutdown();
 });
}

--
发件人:Xuyang
日 期:2024年01月12日 16:36:01
收件人:
主 题:Re:Flink1.16版本java.lang.OutOfMemoryError: Metaspace

Hi, 你的图挂了,贴一下代码吧。



--

Best!
Xuyang




在 2024-01-12 16:23:13,"Summer_Gu"  写道:

版本号:1.16
部署方式: Standalone Cluster集群部署
问题:通过webUi提交任务后,立马使任务报错,重复提交,会导致Metaspace内存溢出,(红框部分)




我看见官网有这部分说明:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
但是我在代码中填入:
并不起作用,所以想问一下是否使用方式有问题?或者还有其他解决方案吗?




Re:Flink1.16版本java.lang.OutOfMemoryError: Metaspace

2024-01-12 文章 Xuyang
Hi, 你的图挂了,贴一下代码吧。



--

Best!
Xuyang




在 2024-01-12 16:23:13,"Summer_Gu"  写道:

版本号:1.16
部署方式: Standalone Cluster集群部署
问题:通过webUi提交任务后,立马使任务报错,重复提交,会导致Metaspace内存溢出,(红框部分)




我看见官网有这部分说明:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
但是我在代码中填入:
并不起作用,所以想问一下是否使用方式有问题?或者还有其他解决方案吗?



Flink1.16版本java.lang.OutOfMemoryError: Metaspace

2024-01-12 文章 Summer_Gu
版本号:1.16
部署方式: Standalone Cluster集群部署
问题:通过webUi提交任务后,立马使任务报错,重复提交,会导致Metaspace内存溢出,(红框部分)



我看见官网有这部分说明:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
但是我在代码中填入:
并不起作用,所以想问一下是否使用方式有问题?或者还有其他解决方案吗?



flinksql以时间函数过滤数据场景求助

2024-01-11 文章 张河川


flink版本1.18
场景如下:
A表字段:
id,update_time(date格式)
一条数据:
1,2023-01-12



现在我需要保留update_time+1年,大于当前日。


简单地写一个sql:
select 
id,update_time
from A
where TIMESTAMPADD(YEAR,1,update_time) > CURRENT_DATE;


结果:
在2024年1月11日这一天,where条件达成,这条数据不会被过滤掉;
在2024年1月12日,sql并不会触发计算来过滤掉此条数据。


在真实的场景中,update_time跨度很多年,且部分数据需要+1年,部分数据不用加,判断条件精确到日期,还有部分数据需要+3年,判断条件只用精确到年。
我该怎么做才能实时地根据CURRENT_DATE来触发过滤的计算呢?


辛苦各位大佬。
| |
张河川
|
|
milesian...@163.com
|

回复: flink-checkpoint 问题

2024-01-11 文章 吴先生
看现象是这样,谢了,我抽空看下这块源码


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state CANCELED to JobManager for task
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
error while consuming partitions
java.nio.channels.ClosedChannelException: null
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes

Re: flink-checkpoint 问题

2024-01-11 文章 Zakelly Lan
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

> TM日志:
> 2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
> and sending final execution state CANCELED to JobManager for task
> ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
> e960208bbd95b1b219bafe4887b48392.
> 2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
> o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
> error while consuming partitions
> java.nio.channels.ClosedChannelException: null
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  at java.lang.Thread.run(Thread.java:748)
>
>
> JM日志,没有25548的触发记录:
> 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 

回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state CANCELED to JobManager for task 
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0 
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR 
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error 
while consuming partitions
java.nio.channels.ClosedChannelException: null
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes in 
50128 ms).
2023-12-31 18:40:10.681 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for job 
d12f3c6e836f56fb23d96e31737ff0b3.
2023-12-31 18:50:10.681 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 25547 of 
job d12f3c6e836f56fb23d96e31737ff0b3 

Re:回复: flink-checkpoint 问题

2024-01-10 文章 Xuyang
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:flink-checkpoint 问题

2024-01-10 文章 ouywl
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。


The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh 
Date:2024-01-10 17:54:42
Subject:flink-checkpoint 问题

Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re: flink-checkpoint 问题

2024-01-10 文章 Zakelly Lan
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

> Flink版本: 1.12
> checkpoint配置:hdfs
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>


flink-checkpoint 问题

2024-01-10 文章 吴先生
Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的



Re:Flink1.12读取不同topic, 认证信息不一样

2024-01-09 文章 Xuyang
Hi,
按照现在通用的设计应该是不行的。要么用两个comsumer读取后union;要么魔改下comsumer的代码,在真正数据拉取时用不同的aksk去读。




--

Best!
Xuyang





在 2024-01-09 14:49:35,"somebody someone" <1107807...@qq.com.INVALID> 写道:
>问题:目前使用Flink版本1.12
>需要接入01和02两个topic,属于同一集群,但是数据方给的两个topic的 
>jass的用户名username和密码password不一样,其他认证信息都一样,不想用两个Consumer去分别读取,
>怎么用同一个source 方式对接这种配置文件不一样的。
>
>这个上面也有人提出过,也没有想要的。
>https://stackoverflow.com/questions/38989443/flink-how-to-read-from-multiple-kafka-cluster-using-same-streamexecutionenviron
>
>
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>Properties properties = new Properties();
>properties.setProperty("bootstrap.servers", 
>"101.xxx.156.xxx:9097");
>properties.setProperty("group.id", "test_zj");
>properties.setProperty("security.protocol", 
>"SASL_SSL");
>properties.setProperty("sasl.mechanism", "PLAIN");
>properties.setProperty("sasl.jaas.config", 
>"org.apache.kafka.common.security.plain.PlainLoginModule required 
>username=\"xxx-9dc9-xxx\" password=\"-4CdNkCo$5b=xxx";");
>properties.setProperty("ssl.truststore.location", 
>"jks/client.truststore.jks");
>properties.setProperty("ssl.truststore.password", 
>"dmxxx");
>
>
>FlinkKafkaConsumerstringFlinkKafkaConsumer = new FlinkKafkaConsumer<(
>
>Arrays.asList("Topic01","topic02"),
>new 
>SimpleStringSchema(),
>properties
>);
>
>
>DataStreamSourceenv.addSource(stringFlinkKafkaConsumer);
>source.print();
>env.execute();
>
>
>
>
>
>
>
>
>
>
>somebodysomeone
>1107807...@qq.com
>
>
>
>


Re:flink1.18 regular join 支持两侧流各自配置state ttl

2024-01-05 文章 Xuyang
Hi,
文档中“The current TTL value for both left and right side is "0 ms", which 
means the state retention is not enabled.”,指的其实是并没有开启state 
ttl的意思,也就是并不会清理state、永久保留state,对应的是public involving 
api中的StateTTLConfig#UpdateType.Disabled[1],文档上的表述确实可以更加清晰一些,方便的话可以提一个jira 
improve一下文档。
另外,有点好奇为什么希望stateful的节点不保留state,可以举一个实际遇到的场景么?


[1] 
https://github.com/apache/flink/blob/4a852fee28f2d87529dc05f5ba2e79202a0e00b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java#L65C14-L65C14



--

Best!
Xuyang





在 2024-01-05 17:17:42,"Thomas Yang"  写道:
>本地测试发现 默认生成0ms,实际测试是两侧保留了永久状态,但是官方文档意思是0ms表示两侧都不保存状态. 是不是文档有错误?
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#idle-state-retention-time
>
>另外咨询下:  如果0表示永久保留state  那么想不保存state应该使用什么值?
>*谢谢!*
>
>
>*杨勇*


Re: flink1.18 regular join 支持两侧流各自配置state ttl

2024-01-05 文章 Thomas Yang
1
*杨勇*


Thomas Yang  于2024年1月5日周五 17:17写道:

> 本地测试发现 默认生成0ms,实际测试是两侧保留了永久状态,但是官方文档意思是0ms表示两侧都不保存状态. 是不是文档有错误?
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#idle-state-retention-time
>
> 另外咨询下:  如果0表示永久保留state  那么想不保存state应该使用什么值?
> *谢谢!*
>
>
> *杨勇*
>


flink1.18 regular join 支持两侧流各自配置state ttl

2024-01-05 文章 Thomas Yang
本地测试发现 默认生成0ms,实际测试是两侧保留了永久状态,但是官方文档意思是0ms表示两侧都不保存状态. 是不是文档有错误?
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#idle-state-retention-time

另外咨询下:  如果0表示永久保留state  那么想不保存state应该使用什么值?
*谢谢!*


*杨勇*


Re:CUMULATE 窗口状态过大导致CK超时

2024-01-04 文章 ouywl
HI Jiaotong:
我的建议如下:
1. 本地存储使用高吞吐的SSD 
2. taskmanager.memory.managed.size 增加并且确保rocksdb memtable内存增加,减少rocksdb 刷磁盘的量
3. 如果有物化sink算子,关闭物化sink算子,减小state。


The following is the content of the forwarded email
From:"jiaot...@mail.jj.cn" 
To:user-zh 
Date:2024-01-05 09:41:01
Subject:CUMULATE 窗口状态过大导致CK超时

Hi All,
 我使用了CUMULATE( STEP => INTERVAL '1' MINUTES, SIZE => INTERVAL '1' DAYS) 
累积窗口,导致太多数据保存在状态中,即使开启了增量式RocksDB,但是当程序运行一段时间后,CK依然超时从而导致任务失败。因此想咨询对于这种大窗口大状态应该如何优化和使用。非常感谢
注:Flink版本 1.14.0



Re:CUMULATE 窗口状态过大导致CK超时

2024-01-04 文章 Xuyang
Hi,

一般来说,业务上如果坚持要使用大state,可以尝试下尽可能的给多并发(让每个并发都持有一部分key的state,摊平大state)和内存(尽可能减少访问落盘的数据,减少IO)来提高性能。
对于你这个case来说,CUMULATE Window 
TVF在实现层面已经尽可能将小窗口的数据进行merge了[1]。可以dump下来看下具体是哪里的问题,是不是有进一步优化的空间。


[1] 
https://github.com/apache/flink/blob/b25dfaee80727d6662a5fd445fe51cc139a8b9eb/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java#L340C15-L340C15

--

Best!
Xuyang





在 2024-01-05 09:41:01,"jiaot...@mail.jj.cn"  写道:
>Hi All,
> 我使用了CUMULATE( STEP => INTERVAL '1' MINUTES, SIZE => INTERVAL '1' 
> DAYS) 
> 累积窗口,导致太多数据保存在状态中,即使开启了增量式RocksDB,但是当程序运行一段时间后,CK依然超时从而导致任务失败。因此想咨询对于这种大窗口大状态应该如何优化和使用。非常感谢
>注:Flink版本 1.14.0


Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4)  
=hash=> window(p=4)
window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新
Best regards,
haishui





在 2024-01-03 14:25:48,"ha.fen...@aisino.com"  写道:
>设置并行度1确实可以了。env.setParallelism(1);
>这里按照key分组,我输入的key都是同一个值,应该在同一个分区。
>watermark.keyBy(item -> item.itemid)
>我理解的就是处理环节其实就是单并行度的,没有问题。问题出在socket这种方式必须使用单并行度?
>发件人: haishui
>发送时间: 2024-01-03 13:35
>收件人: user-zh
>主题: Re:回复: Re: 滑动窗口按照处理时间触发的问题
>Hi,
> 
> 
>应该是并行度的原因,你可以先将并行度设置为1试试。
> 
> 
> 
> 
>Best regards,
>haishui
> 
> 
> 
> 
> 
>在 2024-01-03 12:24:20,"ha.fen...@aisino.com"  写道:
>>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>>public static void main(String[] args) throws Exception {
>>StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>SingleOutputStreamOperator 
>> userItemSingleOutputStreamOperator = env
>>.socketTextStream("172.24.6.109", )
>>.flatMap(new Splitter());
>>
>>WatermarkStrategy watermarkStrategy = WatermarkStrategy
>>.forMonotonousTimestamps()
>>.withTimestampAssigner(new 
>> SerializableTimestampAssigner() {
>>@Override
>>public long extractTimestamp(UserItem element, long 
>> recordTimestamp) {
>>System.out.println(element.timestamp);
>>return element.timestamp;
>>}
>>});
>>SingleOutputStreamOperator watermark = 
>> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>>watermark.keyBy(item -> item.itemid)
>>   // 
>> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>>.window(TumblingEventTimeWindows.of(Time.minutes(5)))
>>.aggregate(new AggregateFunctionMethod(), new MyProcess())
>>.print();
>>
>>env.execute();
>>}
>>public static class Splitter implements FlatMapFunction 
>> {
>>@Override
>>public void flatMap(String sentence, Collector out) throws 
>> Exception {
>>String[] words = sentence.split(",");
>>out.collect(new 
>> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>>}
>>}
>>
>>public static class AggregateFunctionMethod implements 
>> AggregateFunction {
>>@Override
>>public Integer createAccumulator() {
>>return 0;
>>}
>>
>>@Override
>>public Integer add(UserItem item, Integer i) {
>>return i+1;
>>}
>>
>>@Override
>>public Integer getResult(Integer i) {
>>return i;
>>}
>>
>>@Override
>>public Integer merge(Integer i, Integer acc1) {
>>return i+acc1;
>>}
>>}
>>
>>public static class MyProcess extends 
>> ProcessWindowFunction {
>>
>>@Override
>>public void process(Integer s, Context context, Iterable 
>> elements, Collector out) throws Exception {
>>long startTs = context.window().getStart();
>>long endTs = context.window().getEnd();
>>String windowStart = DateFormatUtils.format(startTs, "-MM-dd 
>> HH:mm:ss.SSS");
>>String windowEnd = DateFormatUtils.format(endTs, "-MM-dd 
>> HH:mm:ss.SSS");
>>StringBuilder sb = new StringBuilder();
>>sb.append("窗口["+windowStart+","+windowEnd);
>>out.collect(sb.toString());
>>}
>>}
>> 
>>发件人: Xuyang
>>发送时间: 2024-01-03 09:36
>>收件人: user-zh
>>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>>Hi,
>>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
>> 
>> 
>> 
>> 
>>--
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>>在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>>>Hi,
>>>
>>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>>
>>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>>00:20 - 01:20 这个时间窗口上。
>>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>>的数据,而现实是没有这样的一条数据。
>>>
>>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>>
>>>Best regards,
>>>Jinsui
>>>
>>>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>>>

 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
 我使用socket方式进行录入数据
 2024-01-02 01:19:01  1704129541000
 2024-01-02 01:21:01  1704129661000
 2024-01-02 01:26:01  1704129961000
 2024-01-02 01:29:01  1704130141000
 2024-01-02 01:34:01  1704130441000
 前面是对应的时间,后面是我录入系统的时间
 MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
 2024-01-02 

Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
Hi,


应该是并行度的原因,你可以先将并行度设置为1试试。




Best regards,
haishui





在 2024-01-03 12:24:20,"ha.fen...@aisino.com"  写道:
>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>SingleOutputStreamOperator 
> userItemSingleOutputStreamOperator = env
>.socketTextStream("172.24.6.109", )
>.flatMap(new Splitter());
>
>WatermarkStrategy watermarkStrategy = WatermarkStrategy
>.forMonotonousTimestamps()
>.withTimestampAssigner(new 
> SerializableTimestampAssigner() {
>@Override
>public long extractTimestamp(UserItem element, long 
> recordTimestamp) {
>System.out.println(element.timestamp);
>return element.timestamp;
>}
>});
>SingleOutputStreamOperator watermark = 
> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>watermark.keyBy(item -> item.itemid)
>   // 
> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>.window(TumblingEventTimeWindows.of(Time.minutes(5)))
>.aggregate(new AggregateFunctionMethod(), new MyProcess())
>.print();
>
>env.execute();
>}
>public static class Splitter implements FlatMapFunction {
>@Override
>public void flatMap(String sentence, Collector out) throws 
> Exception {
>String[] words = sentence.split(",");
>out.collect(new 
> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>}
>}
>
>public static class AggregateFunctionMethod implements 
> AggregateFunction {
>@Override
>public Integer createAccumulator() {
>return 0;
>}
>
>@Override
>public Integer add(UserItem item, Integer i) {
>return i+1;
>}
>
>@Override
>public Integer getResult(Integer i) {
>return i;
>}
>
>@Override
>public Integer merge(Integer i, Integer acc1) {
>return i+acc1;
>}
>}
>
>public static class MyProcess extends 
> ProcessWindowFunction {
>
>@Override
>public void process(Integer s, Context context, Iterable 
> elements, Collector out) throws Exception {
>long startTs = context.window().getStart();
>long endTs = context.window().getEnd();
>String windowStart = DateFormatUtils.format(startTs, "-MM-dd 
> HH:mm:ss.SSS");
>String windowEnd = DateFormatUtils.format(endTs, "-MM-dd 
> HH:mm:ss.SSS");
>StringBuilder sb = new StringBuilder();
>sb.append("窗口["+windowStart+","+windowEnd);
>out.collect(sb.toString());
>}
>}
> 
>发件人: Xuyang
>发送时间: 2024-01-03 09:36
>收件人: user-zh
>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>Hi,
>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
> 
> 
> 
> 
>--
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
>在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>>Hi,
>>
>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>
>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>00:20 - 01:20 这个时间窗口上。
>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>的数据,而现实是没有这样的一条数据。
>>
>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>
>>Best regards,
>>Jinsui
>>
>>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>>
>>>
>>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>>> 我使用socket方式进行录入数据
>>> 2024-01-02 01:19:01  1704129541000
>>> 2024-01-02 01:21:01  1704129661000
>>> 2024-01-02 01:26:01  1704129961000
>>> 2024-01-02 01:29:01  1704130141000
>>> 2024-01-02 01:34:01  1704130441000
>>> 前面是对应的时间,后面是我录入系统的时间
>>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>>


Re:Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 Xuyang
Hi,
基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。




--

Best!
Xuyang





在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>Hi,
>
>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>
>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>00:20 - 01:20 这个时间窗口上。
>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>的数据,而现实是没有这样的一条数据。
>
>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>
>Best regards,
>Jinsui
>
>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>
>>
>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>> 我使用socket方式进行录入数据
>> 2024-01-02 01:19:01  1704129541000
>> 2024-01-02 01:21:01  1704129661000
>> 2024-01-02 01:26:01  1704129961000
>> 2024-01-02 01:29:01  1704130141000
>> 2024-01-02 01:34:01  1704130441000
>> 前面是对应的时间,后面是我录入系统的时间
>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>


Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 Jinsui Chen
Hi,

请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。

假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
00:20 - 01:20 这个时间窗口上。
2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
的数据,而现实是没有这样的一条数据。

我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。

Best regards,
Jinsui

ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:

>
> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
> 我使用socket方式进行录入数据
> 2024-01-02 01:19:01  1704129541000
> 2024-01-02 01:21:01  1704129661000
> 2024-01-02 01:26:01  1704129961000
> 2024-01-02 01:29:01  1704130141000
> 2024-01-02 01:34:01  1704130441000
> 前面是对应的时间,后面是我录入系统的时间
> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>


RE: 如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 Jiabao Sun
Hi,

可以参考下这篇文档[1],进行简单的测试。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i


On 2024/01/02 08:02:10 "casel.chen" wrote:
> 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
> flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 casel.chen
我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

Re: Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 文章 Jiabao Sun
Hi,

GuavaFlinkConnectorRateLimiter 目前只在 flink-connector-gcp-pubsub[1] 有使用。
Flink CDC 还未支持限流[2],目前可以尝试降低 snapshot 并发数来缓解数据库压力。 

Best,
Jiabao

[1] 
https://github.com/apache/flink-connector-gcp-pubsub/blob/f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L22
[2] https://github.com/ververica/flink-cdc-connectors/issues/510


> 2024年1月2日 11:39,casel.chen  写道:
> 
> 业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
> 请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?
> 
> 
> 我看到社区之前有人提议过,但issue一直是open状态
> https://issues.apache.org/jira/browse/FLINK-18740
> 
> 
> 另外,我在flink最新master分支源码中有找到 
> GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?



Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 文章 casel.chen
业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?


我看到社区之前有人提议过,但issue一直是open状态
https://issues.apache.org/jira/browse/FLINK-18740


另外,我在flink最新master分支源码中有找到 
GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?

Re: FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 文章 ying lin
从同一个source里select,在flink sql中用statement set 执行两条insert语句到不同的sink表即可

Jiabao Sun  于2023年12月29日周五 16:55写道:

> Hi,
>
> 使用 SQL 的话不太好实现写入多个路径,
> 使用 DataStream 的话可以考虑自己实现一个 RichSinkFunction。
>
> Best,
> Jiabao
>
> On 2023/12/29 08:37:34 jinzhuguang wrote:
> > Flink版本:1.16.0
> >
> > 看官网上的案例:
> > CREATE TABLE MyUserTable (
> >   column_name1 INT,
> >   column_name2 STRING,
> >   ...
> >   part_name1 INT,
> >   part_name2 STRING
> > ) PARTITIONED BY (part_name1, part_name2) WITH (
> >   'connector' = 'filesystem',   -- 必选:指定连接器类型
> >   'path' = 'file:///path/to/whatever',  -- 必选:指定路径
> >   'format' = '...', -- 必选:文件系统连接器指定 format
> > -- 有关更多详情,请参考 Table Formats
> >   'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null
> 或空字符串
> >
> >   -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink
> 的文件数,但是可能会导致数据倾斜,默认值是 false
> >   'sink.shuffle-by-partition.enable' = '...',
> >   ...
> > )
> > 目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。


RE: FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 文章 Jiabao Sun
Hi,

使用 SQL 的话不太好实现写入多个路径,
使用 DataStream 的话可以考虑自己实现一个 RichSinkFunction。

Best,
Jiabao

On 2023/12/29 08:37:34 jinzhuguang wrote:
> Flink版本:1.16.0
> 
> 看官网上的案例:
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,
>   ...
>   part_name1 INT,
>   part_name2 STRING
> ) PARTITIONED BY (part_name1, part_name2) WITH (
>   'connector' = 'filesystem',   -- 必选:指定连接器类型
>   'path' = 'file:///path/to/whatever',  -- 必选:指定路径
>   'format' = '...', -- 必选:文件系统连接器指定 format
> -- 有关更多详情,请参考 Table Formats
>   'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串
> 
>   -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 
> 的文件数,但是可能会导致数据倾斜,默认值是 false
>   'sink.shuffle-by-partition.enable' = '...',
>   ...
> )
> 目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。

FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 文章 jinzhuguang
Flink版本:1.16.0

看官网上的案例:
CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',   -- 必选:指定连接器类型
  'path' = 'file:///path/to/whatever',  -- 必选:指定路径
  'format' = '...', -- 必选:文件系统连接器指定 format
-- 有关更多详情,请参考 Table Formats
  'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串

  -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 
的文件数,但是可能会导致数据倾斜,默认值是 false
  'sink.shuffle-by-partition.enable' = '...',
  ...
)
目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。

RE: Flink SQL Windowing TVFs

2023-12-28 文章 Jiabao Sun
Hi,

在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。
部署到生产的 SQL 是否包含了 GROUP BY 表达式?
本地测试的Flink版本是不是1.14.0?

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate



On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote:
> Hi,
>  我在使用1.14.0版本Flink,本地测试了CUMULATE(TABLE kafka, DESCRIPTOR(rowtime), 
> INTERVAL '60' SECOND, INTERVAL '1' DAYS)方法可以正常运行,但是当我将其部署到线上环境报了如下错误:
>  org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: Currently Flink doesn't support individual window 
> table-valued function CUMULATE(time_col=[rowtime], max_size=[8640 ms], 
> step=[1 min]).
>  Please use window table-valued function with the following computations:
>  1. aggregate using window_start and window_end as group keys.
>  2. topN using window_start and window_end as partition key.
>  3. join with join condition contains window starts equality of input 
> tables and window ends equality of input tables.
>  请问这是因为线上包版本导致的吗,如果是版本问题,具体是哪一个包呢
>  非常感谢
> 

Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

是的,目前来说会 block 住。
flush + apply schema change 一般来说不会持续太长时间,
且 schema 变更一般来说是低频事件,即使 block 也不会有太大性能影响。

Best,
Jiabao


> 2023年12月28日 12:57,casel.chen  写道:
> 
> 
> 
> 
> 感谢解惑!
> 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>> Hi,
>> 
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>> 
>> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>> writer,参考 DorisEventSerializer
>> 
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>> 被 block 的原因是 responseFuture没有 
>> complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>> 在没有完成时会 block 住。 
>> 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>> waitFlushSuccess的responseFuture 标记为 complete。
>> 参考 
>> SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>> 
>> 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>> 
>>> 2023年12月27日 22:14,casel.chen  写道:
>>> 
>>> 看了infoq介绍flink cdc 3.0文章 
>>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>>> d6 其中d代表数据变更,s代表schema变更
>>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>>> 而Task2处理 d3, d4, d5, s3, d6
>>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>>> 
>>> 
>>> SchemaOperator代码中
>>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>>> schemaChangeEvent) {
>>>   // The request will need to send a FlushEvent or block until flushing 
>>> finished
>>>   SchemaChangeResponse response = requestSchemaChange(tableId, 
>>> schemaChangeEvent);
>>>   if (response.isShouldSendFlushEvent()) {
>>>   LOG.info(
>>>   "Sending the FlushEvent for table {} in subtask {}.",
>>>   tableId,
>>>   getRuntimeContext().getIndexOfThisSubtask());
>>>   output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>>   output.collect(new StreamRecord<>(schemaChangeEvent));
>>>   // The request will block until flushing finished in each sink 
>>> writer
>>>   requestReleaseUpstream();
>>>   }
>>>   }
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>>> 求指教,谢谢!
>>> 



Re:Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen



感谢解惑!
还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?








在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>Hi,
>
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>
>Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>writer,参考 DorisEventSerializer
>
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>被 block 的原因是 responseFuture没有 
>complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>在没有完成时会 block 住。 
>只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>waitFlushSuccess的responseFuture 标记为 complete。
>参考 
>SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>
>保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>
>Best,
>Jiabao
>
>[1] 
>https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>
>> 2023年12月27日 22:14,casel.chen  写道:
>> 
>> 看了infoq介绍flink cdc 3.0文章 
>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>> d6 其中d代表数据变更,s代表schema变更
>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>> 而Task2处理 d3, d4, d5, s3, d6
>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>> 
>> 
>> SchemaOperator代码中
>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>> schemaChangeEvent) {
>>// The request will need to send a FlushEvent or block until flushing 
>> finished
>>SchemaChangeResponse response = requestSchemaChange(tableId, 
>> schemaChangeEvent);
>>if (response.isShouldSendFlushEvent()) {
>>LOG.info(
>>"Sending the FlushEvent for table {} in subtask {}.",
>>tableId,
>>getRuntimeContext().getIndexOfThisSubtask());
>>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>output.collect(new StreamRecord<>(schemaChangeEvent));
>>// The request will block until flushing finished in each sink 
>> writer
>>requestReleaseUpstream();
>>}
>>}
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>> 求指教,谢谢!
>> 


Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?

Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
writer,参考 DorisEventSerializer

> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 
使用 responseFuture.get() 在没有完成时会 block 住。 
只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
waitFlushSuccess的responseFuture 标记为 complete。
参考 
SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.

保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit

> 2023年12月27日 22:14,casel.chen  写道:
> 
> 看了infoq介绍flink cdc 3.0文章 
> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
> d6 其中d代表数据变更,s代表schema变更
> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
> 而Task2处理 d3, d4, d5, s3, d6
> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
> 
> 
> SchemaOperator代码中
> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
> schemaChangeEvent) {
>// The request will need to send a FlushEvent or block until flushing 
> finished
>SchemaChangeResponse response = requestSchemaChange(tableId, 
> schemaChangeEvent);
>if (response.isShouldSendFlushEvent()) {
>LOG.info(
>"Sending the FlushEvent for table {} in subtask {}.",
>tableId,
>getRuntimeContext().getIndexOfThisSubtask());
>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>output.collect(new StreamRecord<>(schemaChangeEvent));
>// The request will block until flushing finished in each sink 
> writer
>requestReleaseUpstream();
>}
>}
> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?
> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
> 求指教,谢谢!
> 



flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen
看了infoq介绍flink cdc 3.0文章 
https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6 
其中d代表数据变更,s代表schema变更
这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
而Task2处理 d3, d4, d5, s3, d6
这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?


SchemaOperator代码中
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing 
finished
SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
output.collect(new StreamRecord<>(schemaChangeEvent));
// The request will block until flushing finished in each sink 
writer
requestReleaseUpstream();
}
}
为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
还要发送一次SchemaChangeEvent呢?
当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
upstream的呢?
求指教,谢谢!



<    1   2   3   4   5   6   7   8   9   10   >