那你就要看一下你数据库表的每个字段的编码格式是什么?有没有调整编码格式?我这边是可以的
在 2021-05-18 18:19:31,"casel.chen" <casel_c...@126.com> 写道:
>我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE
>> TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name
>>VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/>
>> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' =
>>'jdbc',<br/> 'url' =
>>'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/>
>> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/>
>>'table-name' = 'jdbc_sink'<br/>)
>>在 2021-05-18 11:55:46,"casel.chen" <casel_c...@126.com> 写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO
>>>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
>>>GroupAggregate(groupBy=[product_name, window_start, window_end],
>>>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS
>>>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS
>>>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd
>>>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT
>>>_UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt,
>>>CAST($f4) AS trans_cnt, CAST(()) AS insert_time,
>>>_UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
>>>AS bus_name]) -> Sink:
>>>Sink(table=[default_catalog.default_database.all_trans_5m_new],
>>>fields=[product_name, window_start, window_end, trans_amt, trans_cnt,
>>>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy
>>>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] -
>>>GroupAggregate(groupBy=[product_name, window_start, window_end, id,
>>>data_type, mer_cust_id, order_no, trans_date], select=[product_name,
>>>window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date,
>>>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name,
>>>window_start, window_end, trans_amt, order_no]) (1/1)#0
>>>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.