你好,
flink1.10,用flinkSQL写hbase,报错:UpsertStreamTableSink requires that Table has a
full primary keys if it is updated.
看到网上的资料说是,upsertSink的primary
key是通过query来推断的,而我的query无法推断出PK,所以报错。说是需要1.10的临时解决方法是加一层group by,使得query可以推断出
primary key。
但是,我添加group by以后还是报错,这个问题该怎么解决呢??到底query是如何推断PK的??
以下是我的sql语句:
创建表的语句(字段有点多,还请见谅)
CREATE TABLE `AssetInfoRiskResultSinkTable` (
rowkey string,
d ROW(`id` bigint,
`user_id` string,
`income_no` string ,
`nation` string,
`card_auth_expiry_time` string ,
`user_name` string ,
`login_phone` string ,
`card_no` string ,
`address` string ,
`highest_eduction` string ,
`is_married` string ,
`resident_address` string ,
`resident_province` string ,
`resident_city` string ,
`resident_town` string ,
`profession` string ,
`job_salary` string ,
`company_name` string ,
`company_province` string ,
`company_city` string ,
`company_town` string ,
`company_address` string ,
`family_name` string ,
`family_phone` string ,
`workmate_name` string ,
`workmate_phone` string ,
`bank_card_no` string ,
`bank_phone` string ,
`device_address` string ,
`device_ip` string ,
`contacts` string ,
`create_user` string ,
`create_time` string ,
`update_user` string ,
`update_time` string,
`scene_id` string ,
`access_type` string ,
`status` string ,
`label_id` string ,
`label_name` string ,
`ocr_real_name` string ,
`ocr_id_card` string ,
`ocr_id_card_address` string,
`longitude` string ,
`latitude` string ,
`imei` string ,
`imsi` string ,
`mac` string ,
`resident_province2` string ,
`loan_use` string ,
`channel_code` string ,
`channel_name` string ,
`credit_card_number` string ,
`product_type_code` string ,
`product_type_name` string ,
`apply_amount` string ,
`income_time` string ,
`credit_card_phone` string ,
`credit_card_amount` string ,
`period` string ,
`start_work_time` string ,
`register_channel` string ,
`register_channel_name` string ,
`bank_name` string ,
`company_call` string ,
`system_tag` string ,
`is_trans` string ,
`is_dial_confirm` string ,
`is_dial_type` string ,
`is_dial_typeM` string ,
`is_dial_typeHuman` string ,
`user_risk_score` string ,
`upload_imgs` string ,
`upload_status` string ,
`famliy_relationship` string ,
`work_relationship` string ,
`request_time` string ,
`ac_record` string ,
`profession_station` string ,
`mobile_os` string ,
`mobile_type` string ,
`mobile_brand` string ,
`networktype` string ,
`jail_break` string ,
`open_udid` string ,
`simulator` string ,
`idfa` string ,
`idfv` string ,
`device_type` string ,
`credit_card_id_card_no` string ,
`credit_card_username` string ,
`sign_issue_org` string ,
`user_level` string ,
`channel_request_time` string ,
`real_income_no` string ,
`org_channel_code` string ,
`qq` string ,
`mail` string ,
`pre_grant_credit_amount` string ,
`pre_grant_credit_term` string ,
`pre_grant_credit_term_unit` string ,
`monthly_repay_amount` string ,
`total_repay_amount` string ,
`xhd_white_list_flag` string ,
`white_list_flag` string ,
`white_list_level` string ,
`white_list_type` string ,
`bus_type` string ,
`housing_fund_status` string ,
`operator_auth_status` string ,
`credit_card_status` string ,
`pboc_credit_status` string ,
`bh_url_flag` string,
`zmxy_auth_expiry_time` string ,
`operator_auth_expiry_time` string ,
`housing_fund_status_time` string ,
`credit_card_expiry_time` string ,
`pboc_credit_status_time` string ,
`credit_card_bank_name` string ,
`due_limit_unit` string ,
`due_limit` string ,
`loan_amount` string ,
`risk_lead_flag` string ,
`period_unit` string ,
`face_score` string ,
`notify_url` string ,
`org_id` string ,
`contract_id` string ,
`birthday` string ,
`zmxy_status` string ,
`is_root` string ,
`is_virtualmachine` string ,
`appnum` string ,
`wifi_ip` string ,
`blue_mac` string ,
`wifi_mac` string ,
`vpn_ip` string ,
`cell_ip` string ,
`true_ip` string ,
`is_helical_accelerator` string ,
`bussiness` string ,
`manual_check` string ,
`has_contacts` string ,
`length_of_residence_year` string ,
`length_of_residence_month` string ,
`gps_province` string ,
`gps_city` string ,
`gps_area` string ,
`gps_detail_address` string ,
`positional_titles` string ,
`work_years` string ,
`work_months` string ,
`max_acceptable_monthly_payment` string ,
`profession_code` string ,
`occupation` string ,
`career_status` string ,
`work_position` string ,
`work_time` string ,
`end_result` string )
) with (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = 'rtest:borrower_related_asset_info_real_time',
'connector.zookeeper.quorum' =
'fdw6.fengjr.inc,fdw4.fengjr.inc,fdw5.fengjr.inc,fjr-yz-204-11,fjr-yz-204-13',
'connector.zookeeper.znode.parent' = 'hbase_test',
'connector.write.buffer-flush.max-size' = '10mb',
'connector.write.buffer-flush.max-rows' = '1000',
'connector.write.buffer-flush.interval' = '2s'
)
插入语句:
INSERT INTO AssetInfoRiskResultSinkTable
select
MD5(real_income_no) as rowkey,
Row(id,
user_id,
income_no,
nation,
card_auth_expiry_time,
user_name,
phone,
id_card,
address,
highest_eduction,
is_married,
resident_address,
resident_province,
resident_city,
resident_town,
profession,
job_salary,
company_name,
company_province,
company_city,
company_town,
company_address,
family_name,
family_phone,
workmate_name,
workmate_phone,
bank_card_no,
bank_phone,
device_address,
device_ip,
contacts,
create_user,
create_time,
update_user,
update_time,
scene_id,
access_type,
status,
label_id,
label_name,
ocr_real_name,
ocr_id_card,
ocr_id_card_address,
longitude,
latitude,
imei,
imsi,
mac,
resident_province2,
loan_use,
channel_code,
channel_name,
credit_card_number,
product_type_code,
product_type_name,
apply_amount,
income_time,
credit_card_phone,
credit_card_amount,
`period`,
start_work_time,
register_channel,
register_channel_name,
bank_name,
company_call,
system_tag,
is_trans,
is_dial_confirm,
is_dial_type,
is_dial_typem,
is_dial_typeHuman,
user_risk_score,
upload_imgs,
upload_status,
famliy_relationship,
work_relationship,
request_time,
ac_record,
profession_station,
mobile_os,
mobile_type,
mobile_brand,
networktype,
jail_break,
open_udid,
simulator,
idfa,
idfv,
device_type,
credit_card_id_card_no,
credit_card_username,
sign_issue_org,
user_level,
channel_request_time,
real_income_no,
org_channel_code,
qq,
mail,
pre_grant_credit_amount,
pre_grant_credit_term,
pre_grant_credit_term_unit,
monthly_repay_amount,
total_repay_amount,
xhd_white_list_flag,
white_list_flag,
white_list_level,
white_list_type,
bus_type,
housing_fund_status,
operator_auth_status,
credit_card_status,
pboc_credit_status,
bh_url_flag,
zmxy_auth_expiry_time,
operator_auth_expiry_time,
housing_fund_status_time,
credit_card_expiry_time,
pboc_credit_status_time,
credit_card_bank_name,
due_limit_unit,
due_limit,
loan_amount,
risk_lead_flag,
period_unit,
face_score,
notify_url,
org_id,
contract_id,
birthday,
zmxy_status,
is_root,
is_virtualmachine,
appnum,
wifi_ip,
blue_mac,
wifi_mac,
vpn_ip,
cell_ip,
true_ip,
is_helical_accelerator,
bussiness,
manual_check,
has_contacts,
length_of_residence_year,
length_of_residence_month,
gps_province,
gps_city,
gps_area,
gps_detail_address,
positional_titles,
work_years,
work_months,
max_acceptable_monthly_payment,
profession_code,
occupation,
career_status,
work_position,
work_time,
end_result) as d
from (
SELECT
real_income_no,
id,
user_id,
income_no,
nation,
card_auth_expiry_time,
if (user_name is not null and user_name <
'',unifedEncryption(user_name),user_name) as user_name,
if(login_phone is not null and login_phone <
'',unifedEncryption(login_phone),login_phone) as phone,
if(card_no is not null and card_no <
'',unifedEncryption(card_no),card_no) as id_card,
address,
highest_eduction,
is_married,
resident_address,
resident_province,
resident_city,
resident_town,
profession,
job_salary,
company_name,
company_province,
company_city,
company_town,
company_address,
if(family_name is not null and family_name <
'',unifedEncryption(family_name),family_name) as family_name,
if(family_phone is not null and family_phone <
'',unifedEncryption(family_phone),family_phone) as family_phone,
if(workmate_name is not null and workmate_name <
'',unifedEncryption(workmate_name),workmate_name) as workmate_name,
if(workmate_phone is not null and workmate_phone <
'',unifedEncryption(workmate_phone),workmate_phone) as workmate_phone,
if(bank_card_no is not null and bank_card_no <
'',unifedEncryption(bank_card_no),bank_card_no) as bank_card_no,
if(bank_phone is not null and bank_phone <
'',unifedEncryption(bank_phone),bank_phone) as bank_phone,
device_address,
device_ip,
contacts,
create_user,
create_time,
update_user,
update_time,
scene_id,
access_type,
status,
label_id,
label_name,
if(ocr_real_name is not null and ocr_real_name <
'',unifedEncryption(ocr_real_name),ocr_real_name) as ocr_real_name,
if(ocr_id_card is not null and ocr_id_card <
'',unifedEncryption(ocr_id_card),ocr_id_card) as ocr_id_card,
ocr_id_card_address,
longitude,
latitude,
imei,
imsi,
mac,
resident_province2,
loan_use,
channel_code,
channel_name,
if(credit_card_number is not null and credit_card_number <
'',unifedEncryption(credit_card_number),credit_card_number) as
credit_card_number,
product_type_code,
product_type_name,
apply_amount,
income_time,
if(credit_card_phone is not null and credit_card_phone <
'',unifedEncryption(credit_card_phone),credit_card_phone) as credit_card_phone,
credit_card_amount,
`period`,
start_work_time,
register_channel,
register_channel_name,
bank_name,
if(company_call is not null and company_call <
'',unifedEncryption(company_call),company_call) as company_call,
system_tag,
is_trans,
is_dial_confirm,
is_dial_type,
is_dial_typeM as is_dial_typem,
is_dial_typeHuman,
user_risk_score,
upload_imgs,
upload_status,
famliy_relationship,
work_relationship,
request_time,
ac_record,
profession_station,
mobile_os,
mobile_type,
mobile_brand,
networktype,
jail_break,
open_udid,
simulator,
idfa,
idfv,
device_type,
credit_card_id_card_no,
credit_card_username,
sign_issue_org,
user_level,
channel_request_time,
org_channel_code,
qq,
mail,
pre_grant_credit_amount,
pre_grant_credit_term,
pre_grant_credit_term_unit,
monthly_repay_amount,
total_repay_amount,
xhd_white_list_flag,
white_list_flag,
white_list_level,
white_list_type,
bus_type,
housing_fund_status,
operator_auth_status,
credit_card_status,
pboc_credit_status,
bh_url_flag,
zmxy_auth_expiry_time,
operator_auth_expiry_time,
housing_fund_status_time,
credit_card_expiry_time,
pboc_credit_status_time,
credit_card_bank_name,
due_limit_unit,
due_limit,
loan_amount,
risk_lead_flag,
period_unit,
face_score,
notify_url,
org_id,
contract_id,
birthday,
zmxy_status,
is_root,
is_virtualmachine,
appnum,
wifi_ip,
blue_mac,
wifi_mac,
vpn_ip,
cell_ip,
true_ip,
is_helical_accelerator,
bussiness,
manual_check,
has_contacts,
length_of_residence_year,
length_of_residence_month,
gps_province,
gps_city,
gps_area,
gps_detail_address,
positional_titles,
work_years,
work_months,
max_acceptable_monthly_payment,
profession_code,
occupation,
career_status,
work_position,
work_time,
end_result
FROM
(
SELECT
*, ROW_NUMBER () OVER (
PARTITION BY id
ORDER BY
update_time DESC
) AS rowNum
FROM
AssetInfoRiskResultTable
)
WHERE
rowNum = 1)
[email protected]