??????
flink1.10??????????????????????????????hbase????????????????????
Exception in thread "main" org.apache.flink.table.api.TableException:
UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at
com.autoai.cns.core.CNSDashboardDML$.responseTime(CNSDashboardDML.scala:178)
at com.autoai.cns.core.CNSDashboardDML$.main(CNSDashboardDML.scala:60)
at com.autoai.cns.core.CNSDashboardDML.main(CNSDashboardDML.scala)
s"""
|INSERT INTO ${databaseName}.response_time_sink
|SELECT
| rowkey,
| ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time,
replace_avg_time, deviate_avg_time) AS cf
|FROM
|(
| select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range)
rowkey,`day`,`time`,
| MAX(CASE req_type WHEN '0' THEN num else 0 END)
initialize_route_avg_time,
| MAX(CASE req_type WHEN '1' THEN num else 0 END)
update_detour_avg_time,
| MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
| MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
| from
| (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`,
| UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8,
TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000
AS `time`,
| req_type,
| (CASE WHEN ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<=
250 THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<=
500 THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END) as
distance_range,
| CAST(AVG(`value`) AS INT) num
| FROM
| ${databaseName}.metric_stream
| WHERE
| metric = 'http_common_request'
| GROUP BY
| TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN
ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250
THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500
THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END))
| group by CONCAT_WS('_',CAST(`time` AS
VARCHAR),distance_range),`day`,`time`
|) a
|""".stripMargin ????????????????????