Hi! 你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData 是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。
另:我记得我已经回复了两封相同的邮件,之前的回复是丢失了吗? 扯 <[email protected]> 于2021年11月5日周五 上午10:50写道: > > 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下: > > clickhouse建表语句如下: > CREATE TABLE test_local.tzling_tb3( > uuid String, > product String, > platform String, > batchId String, > id String, > account String, > customerId String, > reportName String, > dt String, > campaign String, > adGroup String, > generalField String, > currency String, > impressions String, > cost String, > clicks String, > conversions String, > createDateTime String, > createTime BIGINT, > key String, > pdate String > )engine = MergeTree PARTITION BY pdate order by createTime; > 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。 > > processData.addSink(new MSKUpsertClickHouseSink()); > 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: > 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。 > > 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢? >
