Hi casel

使用 proctime 属性时间的字段排序,取第一条是不会产生回撤消息的。

参考:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/deduplication/

```

CREATE TABLE Orders (  order_id  STRING,  user        STRING,  product
    STRING,  num         BIGINT,  proctime AS PROCTIME()) WITH
(...);-- remove duplicate rows on order_id and keep the first
occurrence row,-- because there shouldn't be two orders with the same
order_id.SELECT order_id, user, product, numFROM (  SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS
row_num  FROM Orders)WHERE row_num = 1

```


Best,
Feng Jin


On Sun, Jan 19, 2025 at 7:55 PM casel.chen <casel_c...@126.com> wrote:

> 社区有人回答一下么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2025-01-14 13:42:58,"casel.chen" <casel_c...@126.com> 写道:
> >doris routine load不支持消费墓碑消息,会把它当作一个异常数据进行记录,累计达到阈值后会导致任务失败
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2025-01-14 13:41:00,"casel.chen" <casel_c...@126.com> 写道:
> >>业务tidb表通过ticdc实时同步变更数据到kafka topic,我开发flink实时作业消费kafka处理后投递到另一个kafka
> topic,再由doris创建routine load任务消费sink kafka
> topic写入到doris聚合模型表,做一些count/sum/min/max统计。
> >>针对上游mysql表的更新要拆分出两条append only (+I)
> 数据(通过扩展的cdc数据源来实现),update_before数据要将除聚合key以外的指标值都乘以 -1
> 表示扣减掉,而update_after数据正常输入即可,将这两部分数据union
> all起来后根据业务主键例如order_no(tidb表用的是自增id做为主键)进行keyBy再根据更新时间进行去重。
> >>使用的是select * from ( select *, row_number() over (partition by order_no
> order by event_time) as rownum from tbl ) where rownum = 1 语法。
> >>但是我发现这样出来的结果会产生回撤流,例如
> >>+I update_before -1
> >>-D update_before -1
> >>+I  update_after +1
> >>
> >>请问为什么append only数据源去重也会产生回撤数据?有没有办法不产生回撤数据?或者有什么办法可以过滤掉回撤数据再发到下游kafka?
>

回复