??????
flink1.10??????????????????????????????hbase????????????????????
Exception in thread "main" org.apache.flink.table.api.TableException:
UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.
s"""
|INSERT INTO ${databaseName}.response_time_sink
|SELECT
| rowkey,
| ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time,
replace_avg_time, deviate_avg_time) AS cf
|FROM
|(
| select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range)
rowkey,`day`,`time`,
| MAX(CASE req_type WHEN '0' THEN num else 0 END)
initialize_route_avg_time,
| MAX(CASE req_type WHEN '1' THEN num else 0 END)
update_detour_avg_time,
| MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
| MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
| from
| (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`,
| UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8,
TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000
AS `time`,
| req_type,
| (CASE WHEN ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<=
250 THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<=
500 THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END) as
distance_range,
| CAST(AVG(`value`) AS INT) num
| FROM
| ${databaseName}.metric_stream
| WHERE
| metric = 'http_common_request'
| GROUP BY
| TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN
ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250
THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500
THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END))
| group by CONCAT_WS('_',CAST(`time` AS
VARCHAR),distance_range),`day`,`time`
|) a
|""".stripMargin ????????????????????