我的flink sql作业如下

SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??????



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Received task GroupAggregate(groupBy=[product_name, window_start, 
window_end], select=[product_name, window_start, window_end, 
SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> 
Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) 
DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) 
DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS 
trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
_UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
bus_name]) -> Sink: 
Sink(table=[default_catalog.default_database.all_trans_5m_new], 
fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into 
slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, 
mer_cust_id, order_no, trans_date], select=[product_name, window_start, 
window_end, id, data_type, mer_cust_id, order_no, trans_date, 
MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
window_start, window_end, trans_amt, order_no]) (1/1)#0 
(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.

回复