Hi,caizhi,非常感谢你的回复! 在KafkaDynamicTableFactory.java的createDynamicTableSink(Context context)方法开始,我通过context.getObjectIdentifier().getObjectName()获取到sinkTableName。因为ObjectIdentifier类就唯一标识了一个表,它包括catalogName、databaseName和objectName,分别代表catalog名、数据库名和表名。之后,我通过构造传入到了FlinkKafkaProducer,然后就可以使用了。 我已经解决了该问题,根本原因是influxDB解析sql失败!原因是,我书写的flink sql语句 insert into 或者insert overwrite 中单引号带有换行符,所以写入influxdb会报错。另外,创建表的with参数也要维持版本统一!
| Best, Jimmy | Signature is customized by Netease Mail Master 在2021年07月23日 10:32,Caizhi Weng 写道: Hi! 是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert 语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。 当然也可以选择对 metric 表名进行转义。 Jimmy Zhang <[email protected]> 于2021年7月23日周五 上午10:11写道: > 大家好,Flink版本1.13.1。 > 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric > 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。 > > 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where > b.c='d'的SQL语句时,influxDB中的表可以成功被建出来; > 但如果加上UDF,比如 > insert into a select > CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, > 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where > Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激! > org.influxdb.InfluxDBException$UnableToParseException: partial write: > unable to parse > 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= > insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields > unable to parse ''tablename'\, > 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, > 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields) > > | > Best, > Jimmy > | > > Signature is customized by Netease Mail Master
