感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置
junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2023年7月7日周五 17:57写道:

> 可以自己用DataStream API通过RowKind进行过滤。
> 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
> import org.apache.flink.util.Collector;
>
> /**
>  * 增量数据过滤函数
>  */
> public class AppendOnlyFilterFunction extends RichFlatMapFunction<Row,
> Row> {
>
>     private boolean includedUpdateAfter = false;
>
>     public AppendOnlyFilterFunction() {
>     }
>
>     public AppendOnlyFilterFunction(boolean includedUpdateAfter) {
>         this.includedUpdateAfter = includedUpdateAfter;
>     }
>
>     @Override
>     public void flatMap(Row row, Collector<Row> collector) throws
> Exception {
>         if (RowKind.INSERT == row.getKind()) {
>             collector.collect(row);
>         } else if (includedUpdateAfter && RowKind.UPDATE_AFTER ==
> row.getKind()) {
>             row.setKind(RowKind.INSERT);
>             collector.collect(row);
>         }
>     }
>
> }
>
> 发件人: shi franke
> 发送时间: 2023-07-07 17:33
> 收件人: user-zh
> 主题: Flink connector 是否支持忽略delete message
> 咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置
>
> 吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete
> message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现
>

回复