退订

2021-07-08 Thread Yu Wang
退订


退订

2021-06-01 Thread Yu Wang



退订

2021-06-01 Thread Yu Wang



flink sql source kafka sink 到 mysql 遇主健冲突出现append现象

2021-02-19 Thread Yu Wang
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
06:47:34.609"}


*kafka ddl :*
CREATE TABLE washroom_detail (
 building_id STRING,
 sofa_id STRING,
 floor_num INT,
 occupy_status INT,
 start_time BIGINT,
 end_time BIGINT,
 process_time TIMESTAMP,
 occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
'HH:mm')),
 local_date as date_format(cast(start_time / 1000 as
timestamp), '-MM-dd'),
 day_hour as cast(date_format(cast(start_time / 1000 as
timestamp), 'HH') as INT) + 8
) WITH (
 'connector' = 'kafka',
 'topic' = '',
 'properties.bootstrap.servers' = 'xxx',
 'properties.group.id' = '',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);


*mysql ddl:*
  create table hour_ddl
(
building_idSTRING,
sofa_id  STRING,
local_date STRING,
`hour`  INT,
floor_num INT,
occupy_frequency INT,
occupy_times STRING,
update_time TIMESTAMP,
process_time TIMESTAMP,
primary key (building_id, sofa_id, local_date, `hour`)
NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'username' = 'x'
  'password' = 'xx'
  )


*flink sql dml:*
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
   a.sofa_id,
   a.local_date,
   a.day_hour,
   a.floor_num,
   CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0,
b.occupy_frequency) AS INT),
   concat(if(b.occupy_times IS NULL, '', b.occupy_times),
if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
   NOW(),
   a.process_time
FROM
(SELECT building_id,
sofa_id,
local_date,
day_hour,
floor_num,
count(1) AS frequency,
LISTAGG(occupy_times) AS times,
MAX(process_time) AS process_time,
PROCTIME() AS compute_time
 FROM washroom_detail
 GROUP BY building_id,
  sofa_id,
  local_date,
  day_hour,
  floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL

现象:
当mysql 没有数据时,插入一条记录
occupy_frequencyoccupy_times
  1  15:01-15:03
当主键冲突时
occupy_frequencyoccupy_times
  3  15:01-15:03,15:01-15:03,15:03-15:04
希望应该是
occupy_frequencyoccupy_times
  2  15:01-15:03,15:03-15:04