flink写入mysql数据异常

2023-03-22 文章 小昌同学
使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert 
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jeff
这个方法有效,多谢

















在 2023-03-22 17:11:19,"Jane Chan"  写道:
>Hi,
>
>如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
>如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
>祝好!
>Jane
>
>On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
>> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> >Hi,
>> >
>> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> >
>> >Sincerely,
>> >Shuo
>> >
>> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> >
>> >> 复制执行我提供的两个sql就一定会复现!
>> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> >> 这个问题是这个版本calcite引起的。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >> >bug地址:
>> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >> >
>> >> >
>> >> >bug详细内容:
>> >> >the values of map are truncated by the CASE WHEN
>> function.
>> >> >// sql
>> >> >create table test (a map) with ('connector'='print');
>> >> >insert into test  select * from (values(case when true then
>> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> >> end));
>> >> >
>> >> >the result:
>> >> >
>> >> >+I[{test=123}]
>> >> >
>> >> >We hope the value of result is '123456789', but I get '123', the length
>> >> is limited by 'abc'.
>> >>
>>


Re: 退订

2023-03-22 文章 Shammon FY
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org


On Wed, Mar 22, 2023 at 8:13 PM jianbo zhang  wrote:

> 退订
>


Re: 退订

2023-03-22 文章 Ran Tao
退订是发送邮件到 user-zh-unsubscr...@flink.apache.org 这个地址就可以了。

Best Regards,
Ran Tao


李朋 <1134415...@qq.com.invalid> 于2023年3月22日周三 20:10写道:

> 退订!


退订

2023-03-22 文章 jianbo zhang
退订


(无主题)

2023-03-22 文章 李朋


Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jeff
试过了,不兼容,1.27.0都不兼容

















在 2023-03-22 18:04:17,"tison"  写道:
>如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。
>
>Best,
>tison.
>
>
>Jane Chan  于2023年3月22日周三 18:11写道:
>
>> Hi,
>>
>> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>>
>> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>>
>> 祝好!
>> Jane
>>
>> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>>
>> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> > >Hi,
>> > >
>> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> > >
>> > >Sincerely,
>> > >Shuo
>> > >
>> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> > >
>> > >> 复制执行我提供的两个sql就一定会复现!
>> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> > >> 这个问题是这个版本calcite引起的。
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> > >> >bug地址:
>> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> > >> >
>> > >> >
>> > >> >bug详细内容:
>> > >> >the values of map are truncated by the CASE WHEN
>> > function.
>> > >> >// sql
>> > >> >create table test (a map) with
>> ('connector'='print');
>> > >> >insert into test  select * from (values(case when true then
>> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> > >> end));
>> > >> >
>> > >> >the result:
>> > >> >
>> > >> >+I[{test=123}]
>> > >> >
>> > >> >We hope the value of result is '123456789', but I get '123', the
>> length
>> > >> is limited by 'abc'.
>> > >>
>> >
>>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 tison
如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。

Best,
tison.


Jane Chan  于2023年3月22日周三 18:11写道:

> Hi,
>
> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
> 祝好!
> Jane
>
> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> > >Hi,
> > >
> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> > >
> > >Sincerely,
> > >Shuo
> > >
> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> > >
> > >> 复制执行我提供的两个sql就一定会复现!
> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> > >> 这个问题是这个版本calcite引起的。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> > >> >bug地址:
> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> > >> >
> > >> >
> > >> >bug详细内容:
> > >> >the values of map are truncated by the CASE WHEN
> > function.
> > >> >// sql
> > >> >create table test (a map) with
> ('connector'='print');
> > >> >insert into test  select * from (values(case when true then
> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> > >> end));
> > >> >
> > >> >the result:
> > >> >
> > >> >+I[{test=123}]
> > >> >
> > >> >We hope the value of result is '123456789', but I get '123', the
> length
> > >> is limited by 'abc'.
> > >>
> >
>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jane Chan
Hi,

如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
map ['msg_code','0', 'msg_reason', cast('abc' as string)]

如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/

祝好!
Jane

On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:

> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> >Hi,
> >
> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >
> >Sincerely,
> >Shuo
> >
> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >
> >> 复制执行我提供的两个sql就一定会复现!
> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> 这个问题是这个版本calcite引起的。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> >bug地址:
> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> >
> >> >
> >> >bug详细内容:
> >> >the values of map are truncated by the CASE WHEN
> function.
> >> >// sql
> >> >create table test (a map) with ('connector'='print');
> >> >insert into test  select * from (values(case when true then
> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> end));
> >> >
> >> >the result:
> >> >
> >> >+I[{test=123}]
> >> >
> >> >We hope the value of result is '123456789', but I get '123', the length
> >> is limited by 'abc'.
> >>
>


Re:Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jeff
通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?



















在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>Hi,
>
>如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>
>Sincerely,
>Shuo
>
>On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>
>> 复制执行我提供的两个sql就一定会复现!
>> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> 这个问题是这个版本calcite引起的。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >bug地址:
>> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >
>> >
>> >bug详细内容:
>> >the values of map are truncated by the CASE WHEN function.
>> >// sql
>> >create table test (a map) with ('connector'='print');
>> >insert into test  select * from (values(case when true then
>> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> end));
>> >
>> >the result:
>> >
>> >+I[{test=123}]
>> >
>> >We hope the value of result is '123456789', but I get '123', the length
>> is limited by 'abc'.
>>


Re: regular join每条流单独设置ttl

2023-03-22 文章 Jane Chan
Hi,

我在社区发起了在 Operator 粒度设置 State TTL 的讨论 [1], 支持为每条流单独设置 TTL, 欢迎参与讨论 :)

[1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct

Best,
Jane

On Wed, Feb 15, 2023 at 1:26 PM Jane Chan  wrote:

> 你好,
>
> 目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.
>
> Best regards,
> Jane
>
> On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:
>
>> 大家好,
>> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
>> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>>
>>
>> | |
>> Jason_H
>> |
>> |
>> hyb_he...@163.com
>> |
>
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 文章 Jane Chan
Hi,

这是 Calcite 的一个 bug[1], 已经在 1.27.0 上修复. 不过由于 Flink 1.15.1, 1.15.2 和 1.16.1
都依赖 Calcite 1.26.0, 所以目前只能尝试如下方式绕过, 可以等 release-1.17 发布后升级到新版本上, 应该不会再有问题了.

select * from (values(case when true then map['test','123456789'] else
map ['msg_code','0', 'msg_reason', cast('abc' as string)] end));


[1] https://issues.apache.org/jira/browse/CALCITE-4603

Best,
Jane

On Wed, Mar 22, 2023 at 11:49 AM tison  wrote:

> 你可以关注下发布动态,测试一下 RC
> https://lists.apache.org/thread/d9o0tgnv0fl9goqsdo8wmq9121b9wolv
>
> Best,
> tison.
>
>
> tison  于2023年3月22日周三 11:47写道:
>
> > Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来
> >
> > Best,
> > tison.
> >
> >
> > Shuo Cheng  于2023年3月22日周三 11:42写道:
> >
> >> Hi,
> >>
> >> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >>
> >> Sincerely,
> >> Shuo
> >>
> >> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >>
> >> > 复制执行我提供的两个sql就一定会复现!
> >> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> > 这个问题是这个版本calcite引起的。
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> > >bug地址:
> >> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> > >
> >> > >
> >> > >bug详细内容:
> >> > >the values of map are truncated by the CASE WHEN
> >> function.
> >> > >// sql
> >> > >create table test (a map) with
> ('connector'='print');
> >> > >insert into test  select * from (values(case when true then
> >> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> > end));
> >> > >
> >> > >the result:
> >> > >
> >> > >+I[{test=123}]
> >> > >
> >> > >We hope the value of result is '123456789', but I get '123', the
> length
> >> > is limited by 'abc'.
> >> >
> >>
> >
>