Re: 关于upsert-kafka connector的问题

2021-04-24 文章 Shengkai Fang
这里有对upsert-kafka完整的一个分析的讲解:深度解析 Flink upsert-kafka[1]。如果还有问题,可以继续咨询。

[1]https://flink-learning.org.cn/developers/flink-training-course3/

Shengkai Fang  于2021年4月25日周日 上午10:16写道:

> 本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。
>
> 消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。
>
> Best,
> Shengkai
>
> op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道:
>
>>
>> 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka,
>> 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>>         <
>> fskm...@gmail.com;
>> 发送时间:2021年4月23日(星期五) 中午12:20
>> 收件人:"user-zh">
>> 主题:Re: 关于upsert-kafka connector的问题
>>
>>
>>
>> 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。
>>
>> Best,
>> Shengkai
>
>


Re: 关于upsert-kafka connector的问题

2021-04-24 文章 Shengkai Fang
本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。

消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。

Best,
Shengkai

op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道:

>
> 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka,
> 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> fskm...@gmail.com;
> 发送时间:2021年4月23日(星期五) 中午12:20
> 收件人:"user-zh"
> 主题:Re: 关于upsert-kafka connector的问题
>
>
>
> 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。
>
> Best,
> Shengkai


?????? ????upsert-kafka connector??????

2021-04-23 文章 op
??upsert-kafkasinkkeypartition??keyA??B??kafka,
??upsert-kafka??key??A??B??A




----
??: 
   "user-zh"



Re: 关于upsert-kafka connector的问题

2021-04-22 文章 Shengkai Fang
如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。

Best,
Shengkai


?????? ????upsert-kafka connector??????

2021-04-22 文章 op
??upsert-kafka??key




----
??: 
   "user-zh"



Re: 关于upsert-kafka connector的问题

2021-04-22 文章 Shengkai Fang
Hi,

请问是有什么具体的问题吗?

Best,
Shengkai

op <520075...@qq.com> 于2021年4月22日周四 下午6:05写道:

> 用 upsert-kafka connector 作为source,会有key的插入和更新出现乱序导致结果不准的问题吗?
> 谢谢


????upsert-kafka connector??????

2021-04-22 文章 op
?? upsert-kafka connector 
source??key??


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark.
对于 upsert-kafka connector 有两个疑问:

1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?


Qishang  于2021年3月5日周五 上午11:14写道:

>
> 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
> 学到了,感谢。
>
> Jark Wu  于2021年3月4日周四 下午11:11写道:
>
>> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
>> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
>> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
>> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
>> forward。
>>
>> Best,
>> Jark
>>
>> [1]:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>>
>> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>>
>> > Hi 社区。
>> > Flink 1.12.1
>> >
>> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
>> ,设置大的并发,对于只有
>> > forword 的ETL没有作用。
>> >
>> > insert into table_a select id,udf(a),b,c from table_b;
>> >
>> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
>> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
>> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka
>> 中生效吗?可以用在我上面说的场景上面吗?
>> >
>> > ```
>> > == Physical Execution Plan ==
>> > Stage 1 : Data Source
>> > content : Source: TableSourceScan(table=[[default_catalog,
>> > default_database, temp_table]], fields=[id...])
>> >
>> > Stage 3 : Operator
>> > content : ChangelogNormalize(key=[id])
>> > ship_strategy : HASH
>> >
>> > Stage 4 : Operator
>> > content : Calc(select=[...])
>> > ship_strategy : FORWARD
>> >
>> > Stage 5 : Data Sink
>> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
>> > fields=[id...])
>> > ship_strategy : FORWARD
>> > ```
>> >
>>
>


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。

Jark Wu  于2021年3月4日周四 下午11:11写道:

> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
> forward。
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>
> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>
> > Hi 社区。
> > Flink 1.12.1
> >
> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
> ,设置大的并发,对于只有
> > forword 的ETL没有作用。
> >
> > insert into table_a select id,udf(a),b,c from table_b;
> >
> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
> >
> > ```
> > == Physical Execution Plan ==
> > Stage 1 : Data Source
> > content : Source: TableSourceScan(table=[[default_catalog,
> > default_database, temp_table]], fields=[id...])
> >
> > Stage 3 : Operator
> > content : ChangelogNormalize(key=[id])
> > ship_strategy : HASH
> >
> > Stage 4 : Operator
> > content : Calc(select=[...])
> > ship_strategy : FORWARD
> >
> > Stage 5 : Data Sink
> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
> > fields=[id...])
> > ship_strategy : FORWARD
> > ```
> >
>


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate

On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:

> Hi 社区。
> Flink 1.12.1
>
> 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
> forword 的ETL没有作用。
>
> insert into table_a select id,udf(a),b,c from table_b;
>
> 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
>
> ```
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, temp_table]], fields=[id...])
>
> Stage 3 : Operator
> content : ChangelogNormalize(key=[id])
> ship_strategy : HASH
>
> Stage 4 : Operator
> content : Calc(select=[...])
> ship_strategy : FORWARD
>
> Stage 5 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[id...])
> ship_strategy : FORWARD
> ```
>


Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 文章 Qishang
Hi 社区。
Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?

```
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])

Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH

Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD

Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```