Hi, Hive sink不支持 upsert写入,只能INSERT写入,你怎么设置都不行,一般这种情况可以使用hudi和iceberg作为Sink接受 upsert数据。
Best, LuNing Wang 799590...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年4月6日周三 14:37写道: > 问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式? > > flink:1.13.6 > hive:1.1.1 > hadoop:2.6.0-cdh5.16.2 > > 纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误 > > doesn't support consuming update changes which is produced by node > GroupAggregate(groupBy=[id, user_id, status, EXPR$3] > > 在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错 > > source表 > > CREATE TABLE data_2432_5074_model( > id STRING, > user_id STRING,status STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'person', > 'properties.bootstrap.servers' = '192.168.9.116:9092', > 'properties.group.id' = 'chinaoly-group', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json', > 'json.fail-on-missing-field'='false', > 'json.ignore-parse-errors'='true' > ) > sink表 > > CREATE TABLE output_2432_5076_model_1649226175146( > id STRING, > user_id STRING, > status STRING, > my_dt timestamp > ) TBLPROPERTIES ( > 'streaming-source.enable' = 'true', > 'streaming-source.partition.include' = 'all', > 'streaming-source.partition-order' = 'create-time', > 'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai', > 'sink.partition-commit.policy.kind' = 'metastore,success-file', > 'write.upsert.enable' = 'true', > 'streaming-source.monitor-interval' = '1 min' > ) > > 计算逻辑 > INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id, > user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM > data_2432_5074_model) WHERE status = '1' > > 万能的官方,能否给我答案,先谢谢了。 > > > > 799590...@qq.com >