你好,我没有自己开发连接器,我用的是kafka connector,influxdb只是作为一个metrics信息存储端,你是需要一个influxdb的连接器?我只是用到了metrics统计体系而已,只是自定义了Counter对象,和连接器没有什么关系
发自 网易邮箱大师 ---- 回复的原邮件 ---- | 发件人 | 信华哺<[email protected]> | | 日期 | 2021年11月26日 17:22 | | 收件人 | [email protected]<[email protected]> | | 抄送至 | | | 主题 | 回复:Flink kafka自定义metrics在influxdb上解析失败 | 你好: 我想问一下,你用的flink sql连接器是自己开发的么? 我在网上只能找到一个datastream的influxdb连接器 在2021年7月23日 10:11,Jimmy Zhang<[email protected]> 写道: 大家好,Flink版本1.13.1。 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where b.c='d'的SQL语句时,influxDB中的表可以成功被建出来; 但如果加上UDF,比如 insert into a select CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激! org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields unable to parse ''tablename'\, 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields) | Best, Jimmy | Signature is customized by Netease Mail Master
