Hi Jark.
对于 upsert-kafka connector 有两个疑问:

1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?


Qishang <[email protected]> 于2021年3月5日周五 上午11:14写道:

>
> 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
> 学到了,感谢。
>
> Jark Wu <[email protected]> 于2021年3月4日周四 下午11:11写道:
>
>> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
>> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
>> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
>> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
>> forward。
>>
>> Best,
>> Jark
>>
>> [1]:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>>
>> On Thu, 4 Mar 2021 at 15:30, Qishang <[email protected]> wrote:
>>
>> > Hi 社区。
>> > Flink 1.12.1
>> >
>> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
>> ,设置大的并发,对于只有
>> > forword 的ETL没有作用。
>> >
>> > insert into table_a select id,udf(a),b,c from table_b;
>> >
>> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
>> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
>> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka
>> 中生效吗?可以用在我上面说的场景上面吗?
>> >
>> > ```
>> > == Physical Execution Plan ==
>> > Stage 1 : Data Source
>> > content : Source: TableSourceScan(table=[[default_catalog,
>> > default_database, temp_table]], fields=[id...])
>> >
>> > Stage 3 : Operator
>> > content : ChangelogNormalize(key=[id])
>> > ship_strategy : HASH
>> >
>> > Stage 4 : Operator
>> > content : Calc(select=[...])
>> > ship_strategy : FORWARD
>> >
>> > Stage 5 : Data Sink
>> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
>> > fields=[id...])
>> > ship_strategy : FORWARD
>> > ```
>> >
>>
>

回复