Re: Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-14 文章 Jane Chan
Hi iasiuide,

感谢提问. 先来回答最后一个问题

关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗?
>

Lookup join 的 on condition 会在优化过程中经过一系列改写, 这里只简要对影响 lookup 和 where 的几处进行说明.

1. logical 阶段, FlinkFilterJoinRule 会将 on 条件 split 为针对单边的 (左表/右表) 和针对双边的.
**针对单边的 filter 会被尽量 pushdown 到 join 节点之前** (这意味着有可能会额外生成一个 Filter 节点);
Filter 节点后续如何变化取决于这个 filter 能否 pushdown 到 source, 如果不能, 那么在 physical
阶段它就会变成维表上面 Calc 节点 (denoted by calcOnTemporalTable) 里面的 condition.

2. 在 CommonPhysicalLookupJoin 里解析 allLookupKeys 的时候, 会试图从
calcOnTemporalTable 里把常量条件抽取出来形成最终的 lookup key (也就是 explain plan 里面
lookup=[...] 的内容), 在 explain 时, 只要存在 calcOnTemporalTable, where=[...]
就会被打印出来.

回到具体的 case

为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>

因为 b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
是针对维表单边的条件且无法被下推. 另外, 这里使用了非确定性函数[1], 请关注结果的正确性.


> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>

此时常量可以被提取出来


> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>

据我所知 lookup 目前应该还不支持 SARGable

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/

Best,
Jane

On Fri, Mar 8, 2024 at 11:19 AM iasiuide  wrote:

> 好的,已经贴了sql片段
>
> 在 2024-03-08 11:02:34,"Xuyang"  写道:
> >Hi, 你的图挂了,可以用图床或者直接贴SQL
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2024-03-08 10:54:19,"iasiuide"  写道:
> >
> >
> >
> >
> >
> >下面的sql片段中
> >ods_ymfz_prod_sys_divide_order  为kafka source表
> >dim_ymfz_prod_sys_trans_log   为mysql为表
> >dim_ptfz_ymfz_merchant_info   为mysql为表
> >
> >
> >
> >flink web ui界面的执行计划片段如下:
> >
> > [1]:TableSourceScan(table=[[default_catalog, default_database,
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time),
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))),
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id,
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> >+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time,
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 *
> divide_fee_amt), divide_fee_amt) AS div_fee_amt,
> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time
> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt
>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS
> TIMESTAMP(9)), '-MM-dd')))])
> >   +-
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))],
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts,
> bg_rel_trans_id, pay_type, member_id, mer_name])
> >  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> > +-
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source
> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, pk_id, agent_id, bagent_id])
> >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, agent_id, bagent_id])
> >   +-
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id],
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])],
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id,
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> >  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt,
> ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS
> fagent_id0])
> > +-
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0],
> where=[(data_source = 'agent')], select=[sys_date, create_time,
> div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0,
> fagent_id0, pk_id, agent_name, bagent_name])
> >  
> >
> >
> >为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> >关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> 

Re: Flink SQL的状态清理

2023-10-17 文章 Jane Chan
Hi, 你好

如果使用的是 standalone session cluster, 想要在 JM/TM 日志中看到参数打印出来, 需要在集群启动前在
flink-conf.yaml 配置 table.exec.state.ttl: '${TTL}', 再启动集群;
集群启动后再修改的话, 日志不会打印出来, 可以通过 SET; 命令查看当前 SQL CLI 中配置的参数.
另外, 需要先执行 SET 'table.exec.state.ttl' = '${TTL}' , 然后提交作业, 可以确认下操作顺序是否有误.

祝好!
Jane

On Mon, Oct 9, 2023 at 6:01 PM 小昌同学  wrote:

> 你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
> ---- 回复的原邮件 
> | 发件人 | Jane Chan |
> | 发送日期 | 2023年9月25日 11:24 |
> | 收件人 |  |
> | 主题 | Re: Flink SQL的状态清理 |
> Hi,
>
> 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86
>
> Best,
> Jane
>
> On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:
>
> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>


Re: Flink SQL的状态清理

2023-09-24 文章 Jane Chan
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 退订

2023-09-03 文章 Jane Chan
退订请发送邮件至 user-zh-unsubscr...@flink.apache.org

Best,
Jane

On Sun, Sep 3, 2023 at 6:15 PM lei-tian  wrote:

> 退订
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |


Re: flink写入mysql数据异常

2023-03-23 文章 Jane Chan
附件还是没有收到哦.

Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries

On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:

> 您好,我刚刚重新上传了附件;是的,Flink
> SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
> ---- 回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 15:42
> 收件人  
> 主题 Re: flink写入mysql数据异常
> Hi,
>
> 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
> 并且与数据库中物理表主键保持一致. 可以参考 [1].
>
> [1]
>
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
>
> On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:
>
> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
>
> 小昌同学
> ccc0606fight...@163.com
>
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D
> >
>  回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 14:23
> 收件人  
> 主题 Re: flink写入mysql数据异常
> 可以把完整 SQL 发出来看看
>
> 祝好!
> Jane
>
> On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
>
> 使用flink
> sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
> ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>
>
>


Re: flink写入mysql数据异常

2023-03-23 文章 Jane Chan
Hi,

没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
并且与数据库中物理表主键保持一致. 可以参考 [1].

[1]
https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86

On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:

> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
>
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
> ---- 回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 14:23
> 收件人  
> 主题 Re: flink写入mysql数据异常
> 可以把完整 SQL 发出来看看
>
> 祝好!
> Jane
>
> On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
>
> 使用flink
> sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
> ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>
>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jane Chan
Hi,

如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
map ['msg_code','0', 'msg_reason', cast('abc' as string)]

如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/

祝好!
Jane

On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:

> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> >Hi,
> >
> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >
> >Sincerely,
> >Shuo
> >
> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >
> >> 复制执行我提供的两个sql就一定会复现!
> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> 这个问题是这个版本calcite引起的。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> >bug地址:
> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> >
> >> >
> >> >bug详细内容:
> >> >the values of map are truncated by the CASE WHEN
> function.
> >> >// sql
> >> >create table test (a map) with ('connector'='print');
> >> >insert into test  select * from (values(case when true then
> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> end));
> >> >
> >> >the result:
> >> >
> >> >+I[{test=123}]
> >> >
> >> >We hope the value of result is '123456789', but I get '123', the length
> >> is limited by 'abc'.
> >>
>


Re: regular join每条流单独设置ttl

2023-03-22 文章 Jane Chan
Hi,

我在社区发起了在 Operator 粒度设置 State TTL 的讨论 [1], 支持为每条流单独设置 TTL, 欢迎参与讨论 :)

[1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct

Best,
Jane

On Wed, Feb 15, 2023 at 1:26 PM Jane Chan  wrote:

> 你好,
>
> 目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.
>
> Best regards,
> Jane
>
> On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:
>
>> 大家好,
>> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
>> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>>
>>
>> | |
>> Jason_H
>> |
>> |
>> hyb_he...@163.com
>> |
>
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jane Chan
Hi,

这是 Calcite 的一个 bug[1], 已经在 1.27.0 上修复. 不过由于 Flink 1.15.1, 1.15.2 和 1.16.1
都依赖 Calcite 1.26.0, 所以目前只能尝试如下方式绕过, 可以等 release-1.17 发布后升级到新版本上, 应该不会再有问题了.

select * from (values(case when true then map['test','123456789'] else
map ['msg_code','0', 'msg_reason', cast('abc' as string)] end));


[1] https://issues.apache.org/jira/browse/CALCITE-4603

Best,
Jane

On Wed, Mar 22, 2023 at 11:49 AM tison  wrote:

> 你可以关注下发布动态,测试一下 RC
> https://lists.apache.org/thread/d9o0tgnv0fl9goqsdo8wmq9121b9wolv
>
> Best,
> tison.
>
>
> tison  于2023年3月22日周三 11:47写道:
>
> > Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来
> >
> > Best,
> > tison.
> >
> >
> > Shuo Cheng  于2023年3月22日周三 11:42写道:
> >
> >> Hi,
> >>
> >> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >>
> >> Sincerely,
> >> Shuo
> >>
> >> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >>
> >> > 复制执行我提供的两个sql就一定会复现!
> >> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> > 这个问题是这个版本calcite引起的。
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> > >bug地址:
> >> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> > >
> >> > >
> >> > >bug详细内容:
> >> > >the values of map are truncated by the CASE WHEN
> >> function.
> >> > >// sql
> >> > >create table test (a map) with
> ('connector'='print');
> >> > >insert into test  select * from (values(case when true then
> >> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> > end));
> >> > >
> >> > >the result:
> >> > >
> >> > >+I[{test=123}]
> >> > >
> >> > >We hope the value of result is '123456789', but I get '123', the
> length
> >> > is limited by 'abc'.
> >> >
> >>
> >
>


Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 文章 Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1],
这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性.
另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.

一些可能的建议如下

1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer
节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会导致非确定性更新[3].
2. 检查写入 MySQL 数据库中的物理表 PK 字段是否和 Flink SQL sink 表的 PK 字段保持一致.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
[2]
https://github.com/apache/flink/blob/3ea83baad0c8413f8e1f4a027866335d13789538/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L378
[3]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/#31-%e6%b5%81%e4%b8%8a%e7%9a%84%e4%b8%8d%e7%a1%ae%e5%ae%9a%e6%80%a7

Best,
Jane

On Mon, Mar 6, 2023 at 11:24 AM 陈佳豪  wrote:

> 刚做了一下测试
> 目前假定有3行数据需要同步(全量):
> | 编号 |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 1311313
> |
> 789
> |
>
>
>
>
> 这个时候我修改第四行数据的两个字段(增量):
> | 1
>
>
> |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 13113133110
> |
> 888
> |
> 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
> 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
>
> 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。
>
>
>
>
>
>
>
>
> 在 2023-03-06 10:54:23,"陈佳豪"  写道:
> >hi 早上好
>
> >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
> >
> >== Abstract Syntax Tree ==
> >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID,
> 名称, 手机, 座机])
> >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"],
> 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
> >   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
> >
> >
> >== Optimized Physical Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机,
> CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >== Optimized Execution Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID,
> 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255))
> AS 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >
> >在 2023-03-05 15:37:53,"Jane Chan"  写道:
> >>Hi,
> >>
> >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在
> 1.16.1
> >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
> >>打印出来看看.
> >>
> >>[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>
> >>祝好!
> >>Jane
> >>
> >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪  wrote:
> >>
> >>> hi 你好
> >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2023-03-02 11:52:41,"Jane Chan"  写道:
> >>> >Hi,
> >>> >
> >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本,
> 这个
> >>> >query 在 1.16.2 上验证没有问题
> >>> >
> >>> >[1]
> >>> >
> >>>
> https://nightlies

Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 文章 Jane Chan
Hi,

抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
打印出来看看.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

祝好!
Jane

On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪  wrote:

> hi 你好
> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-02 11:52:41,"Jane Chan"  写道:
> >Hi,
> >
> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
> >query 在 1.16.2 上验证没有问题
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >
> >Best,
> >Jane
> >
> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪  wrote:
> >
> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-01 18:14:35,"陈佳豪"  写道:
> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >> >String kafka = "CREATE TABLE `电话` (`rowid`
> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> >> 'connector' = 'kafka', 'topic' =
> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >> >
> >> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称`
> >> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT
> >> ENFORCED  )  WITH ('connector' = 'jdbc','driver' =
> >> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://
> >> 43.136.128.102:6506/meihua_test','username' = 'root',
> 'password' =
> >> '123456','table-name' = '电话2'  )";
> >> >
> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
> `座机`
> >> from `电话` ) as t_1";
> >> >
> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
> >>
>


Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 Jane Chan
Hi,

可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

Best,
Jane

On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪  wrote:

> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-01 18:14:35,"陈佳豪"  写道:
> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >String kafka = "CREATE TABLE `电话` (`rowid`
> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> 'connector' = 'kafka', 'topic' =
> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >
> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称`
> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT
> ENFORCED  )  WITH ('connector' = 'jdbc','driver' =
> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://
> 43.136.128.102:6506/meihua_test','username' = 'root','password' =
> '123456','table-name' = '电话2'  )";
> >
> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机`
> from `电话` ) as t_1";
> >
> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>


Re:

2023-02-25 文章 Jane Chan
退订请发送邮件至 user-zh-unsubscr...@flink.apache.org

Best,
Jane

On Fri, Feb 24, 2023 at 7:43 PM LITA LITA  wrote:

> 退订
>
> <704669...@qq.com.invalid> 于2023年2月24日周五 07:58写道:
>
> > 退订
> >
> >
>


Re: 使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-25 文章 Jane Chan
Hi,

原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题

payload

{
  "before": {
"rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d",
"63f73b332e77497da91286f0": "Jerry",
"63f73b3f2e77497da91286fb": "mobile number",
"63f73b3f2e77497da91286fc": "telephone number"
  },
  "after": null,

"source": {...},
  "op": "d",
  "ts_ms": 1677342340042,
  "transaction": null
}

flink sql

Flink SQL> insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `手机` from (select `rowid` as `rowID`,
`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,
`63f73b3f2e77497da91286fc` as `座机` from `电话_1`) as t_1;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8490c9530d3a97e73aeedfe9745f2fe3

mysql output

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Tom| mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Jerry  | mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
Empty set (0.00 sec)

Best,
Jane

On Fri, Feb 24, 2023 at 2:21 PM 陈佳豪  wrote:

> -建表语法如下
> String kafka = "CREATE TABLE `电话` " +
> "(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机`
> VARCHAR(255),  " +
> "  PRIMARY KEY (`rowID`) NOT ENFORCED  ) " +
> " WITH " +
> "('connector' = 'jdbc',   " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',   " +
> " 'url' = 'jdbc:mysql://XX:6506/meihua_test',  " +
> "  'username' = 'root',  " +
> "  'password' = '123456',  " +
> "  'table-name' = '电话'  )";
>
> String mysql = "CREATE TABLE `电话_1` " +
> "(`rowid` VARCHAR(100)," +
> "`63f73b332e77497da91286f0` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fb` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fc` VARCHAR(100)," +
> "`op` STRING ," +
> " PRIMARY KEY (rowid) NOT ENFORCED )" +
> " WITH " +
> "( 'connector' = 'kafka', " +
> "'topic' =
> 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
> " 'properties.bootstrap.servers' = 'XX:9092'," +
> " 'scan.startup.mode' = 'earliest-offset', " +
> "'format' = 'debezium-json' )";
> -执行语句如下
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -操作数据如下
>
>
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -执行语句如下
> {
> "op":"d",
> "before":{
> "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
> },
> "after":null
> }
> 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
> 各位大佬帮看下 谢谢。


Re: regular join每条流单独设置ttl

2023-02-14 文章 Jane Chan
你好,

目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.

Best regards,
Jane

On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:

> 大家好,
> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: [ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 文章 Jane Chan
Congrats! Thanks Jingsong for driving this release, and thanks to all
contributors!

Best,
Jane

On Mon, Aug 29, 2022 at 11:35 AM Jingsong Li  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Table Store 0.2.0.
>
> Apache Flink Table Store is a unified storage to build dynamic tables
> for both streaming and batch processing in Flink, supporting
> high-speed data ingestion and timely data query.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Table Store can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20table-store
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Jingsong Lee
>