Hi

Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理
社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。


Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-18826 
<https://issues.apache.org/jira/browse/FLINK-18826>

> 在 2020年8月12日,15:58,xiao cai <flin...@163.com> 写道:
> 
> Hi Jark:
> 版本:1.11.0
> 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
> AppendStreamTableSink doesn't support consuming update changes which is 
> produced by node GroupAggregate
> 
> 
> 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka
> 
> 
> 附上执行sql:
> create table kafka_table_1 (  
>    `shop_id` varchar,  
>    `user_id` bigint,  
>    `category_id` int, 
>    `ts` bigint,  
>    `row_time` timestamp(3), 
>    `proc_time` timestamp(3), 
> ) with (  
>    'connector.type' = 'kafka',  
>    'connector.version' = 'universal',  
>    'connector.topic' = 'user_visit_1',  
>    'connector.startup-mode' = 'latest-offset',  
>    'connector.properties.bootstrap.servers' = 'ip:9092',  
>    'connector.properties.zookeeper.connect' = 'ip:2181',  
>    'update-mode' = 'append', 
>    'format.type' = 'avro-registry', 
>    'format.schema-subject' = 'user_visit', 
>    'format.schema-url'='http://ip:8081', 
> )
> 
> 
> CREATE TABLE hbase_table ( 
>    rowKey STRING, 
>    cf ROW<age STRING, area STRING> 
> ) WITH ( 
>    'connector.type' = 'hbase', 
>    'connector.version' = '1.4.3', 
>    'connector.table-name' = 'hbase_table', 
>    'connector.zookeeper.quorum' = 'ip:2181', 
>    'connector.zookeeper.znode.parent' = '/hbase', 
>    'connector.write.buffer-flush.max-rows' = '1000' 
> )
> 
> 
> 
> 
> create table kafka_table_2 (  
>    `shop_id` varchar,  
>    `age` varchar,  
>    `area` varchar
> ) with (  
>    'connector.type' = 'kafka',  
>    'connector.version' = 'universal',  
>    'connector.topic' = 'user_visit_2',  
>    'connector.startup-mode' = 'latest-offset',  
>    'connector.properties.bootstrap.servers' = 'ip:9092',  
>    'connector.properties.zookeeper.connect' = 'ip:2181',  
>    'update-mode' = 'append', 
>    'format.type' = 'avro-registry', 
>    'format.schema-subject' = 'user_visit', 
>    'format.schema-url'='http://ip:8081', 
> )
> 
> 
> insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
> proc_time)
> select shop_id, age, area 
> from kafka_table_1 left join hbase_table
> for system_time as of kafka_table_1.proc_time as temp on 
> kafka_table_1.shop_id = temp.rowKey
> group by shop_id, age, area
> 
> 
> 原始邮件 
> 发件人: xiao cai<flin...@163.com>
> 收件人: user-zh<user-zh@flink.apache.org>
> 发送时间: 2020年8月12日(周三) 15:41
> 主题: AppendStreamTableSink doesn't support consuming update changes
> 
> 
> Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
> sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update 
> changes which is produced by node GroupAggregate

回复