Hi All:
使用flink-sql写入hbase sink时报错:
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.


我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
sql如下:
create table user_click_source(
`id` bigint,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint,
`catalog_id` int,
`device_id` int,
`user_id` int,
`proc_time` timestamp(3)
PRIMARY KEY (id) NOT ENFORCED
)with(
'connector.type' = 'kafka',
……
)
;
create table dim_user(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>,
ts bigint
)with(
'connector.type'='hbase',
……
)
;


create table dim_device(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;


create table dim_catalog(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;
create table hbase_full_user_click_case1_sink(
`rowkey` bigint,
cf ROW<
`click_id` bigint,
`click_name` varchar,
`click_partition` int,
`click_event_time` bigint,
`click_write_time` bigint,
`click_snapshot_time` bigint,
`click_max_snapshot_time` bigint,
`catalog_id` int,
`catalog_name` varchar,
`catalog_partition` int,
`catalog_event_time` bigint,
`catalog_write_time` bigint,
`catalog_snapshot_time` bigint,
`catalog_max_snapshot_time` bigint,
`device_id` int,
`device_name` varchar,
`device_partition` int,
`device_event_time` bigint,
`device_write_time` bigint,
`device_snapshot_time` bigint,
`device_max_snapshot_time` bigint,
`user_id` int,
`user_name` varchar,
`user_partition` int,
`user_event_time` bigint,
`user_write_time` bigint,
`user_snapshot_time` bigint,
`user_max_snapshot_time` bigint
>,
PRIMARY KEY (rowkey) NOT ENFORCED
)with(
'connector.type'='hbase',
……
)
;
insert into hbase_full_user_click_case1_sink
select
`click_id`,
ROW(
`click_id`,
`click_name`,
`click_partition`,
`click_event_time`,
`click_write_time`,
`click_snapshot_time`,
`click_max_snapshot_time`,
`catalog_id`,
`catalog_name`,
`catalog_partition`,
`catalog_event_time`,
`catalog_write_time`,
`catalog_snapshot_time`,
`catalog_max_snapshot_time`,
`device_id`,
`device_name`,
`device_partition`,
`device_event_time`,
`device_write_time`,
`device_snapshot_time`,
`device_max_snapshot_time`,
`user_id`,
`user_name`,
`user_partition`,
`user_event_time`,
`user_write_time`,
`user_snapshot_time`,
`user_max_snapshot_time`
)
from (select
click.id as `click_id`,
click.name as `click_name`,
click.kafka_partition as `click_partition`,
click.event_time as `click_event_time`,
click.write_time as `click_write_time`,
click.snapshot_time as `click_snapshot_time`,
click.max_snapshot_time as `click_max_snapshot_time`,
cat.cf.id as `catalog_id`,
cat.cf.name as `catalog_name`,
cat.cf.kafka_partition as `catalog_partition`,
cat.cf.event_time as `catalog_event_time`,
cat.cf.write_time as `catalog_write_time`,
cat.cf.snapshot_time as `catalog_snapshot_time`,
cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
dev.cf.id as `device_id`,
dev.cf.name as `device_name`,
dev.cf.kafka_partition as `device_partition`,
dev.cf.event_time as `device_event_time`,
dev.cf.write_time as `device_write_time`,
dev.cf.snapshot_time as `device_snapshot_time`,
dev.cf.max_snapshot_time as `device_max_snapshot_time`,
u.cf.id as `user_id`,
u.cf.name as `user_name`,
u.cf.kafka_partition as `user_partition`,
u.cf.event_time as `user_event_time`,
u.cf.write_time as `user_write_time`,
u.cf.snapshot_time as `user_snapshot_time`,
u.cf.max_snapshot_time as `user_max_snapshot_time`


from (select
id,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
cast(catalog_id as varchar) as catalog_key,
cast(device_id as varchar) as device_key,
cast(user_id as varchar) as user_key,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`,
`event_time`,
FROM user_click_source
GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
`id`,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`) click


left join dim_catalog cat on click.catalog_key = cat.rowkey
left join dim_device dev on click.device_key = dev.rowkey
left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts
) t

回复