Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka connector中的offset, 
partition,用户可以引用这些metadata进行过滤操作?














在 2023-07-10 23:39:00,"yh z" <zhengyunhon...@gmail.com> 写道:
>Hi,  shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
>你可以参考 github 上的一些实现,例如 clickhouse:
>https://github.com/liekkassmile/flink-connector-clickhouse-1.13
>
>shi franke <yshi24...@gmail.com> 于2023年7月7日周五 19:24写道:
>
>>
>> 感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置
>> junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2023年7月7日周五 17:57写道:
>>
>> > 可以自己用DataStream API通过RowKind进行过滤。
>> > 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> > import org.apache.flink.types.Row;
>> > import org.apache.flink.types.RowKind;
>> > import org.apache.flink.util.Collector;
>> >
>> > /**
>> >  * 增量数据过滤函数
>> >  */
>> > public class AppendOnlyFilterFunction extends RichFlatMapFunction<Row,
>> > Row> {
>> >
>> >     private boolean includedUpdateAfter = false;
>> >
>> >     public AppendOnlyFilterFunction() {
>> >     }
>> >
>> >     public AppendOnlyFilterFunction(boolean includedUpdateAfter) {
>> >         this.includedUpdateAfter = includedUpdateAfter;
>> >     }
>> >
>> >     @Override
>> >     public void flatMap(Row row, Collector<Row> collector) throws
>> > Exception {
>> >         if (RowKind.INSERT == row.getKind()) {
>> >             collector.collect(row);
>> >         } else if (includedUpdateAfter && RowKind.UPDATE_AFTER ==
>> > row.getKind()) {
>> >             row.setKind(RowKind.INSERT);
>> >             collector.collect(row);
>> >         }
>> >     }
>> >
>> > }
>> >
>> > 发件人: shi franke
>> > 发送时间: 2023-07-07 17:33
>> > 收件人: user-zh
>> > 主题: Flink connector 是否支持忽略delete message
>> > 咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置
>> >
>> > 吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete
>> > message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现
>> >
>>

回复