大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复