我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。
SQL如下:
CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'kafka',
'topic' = 'datacollect_business_stage',
'properties.bootstrap.servers' = 'XXX',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE `datacollect_business_doris` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);
insert into datacollect_business_doris select * from datacollect_business_kafka;
在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
flink版本:1.12.4
部署模式:on yarn
希望各位大佬帮助。
谢谢!
[email protected]