最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not supported: ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map udf应该怎么操作呢?求前辈指导
udfd代码如下: public class Json2List extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(Json2List.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ; public Json2List(){} public List<String> eval(String param) { List<String> result = new ArrayList<>(); try { List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param, List.class); for(Map<Object, Object> map : list){ result.add(OBJECT_MAPPER.writeValueAsString(map)); } return result; } catch (JsonProcessingException e){ LOG.error("failed to convert json to array, param is: {}", param, e); } return result; } @Override public TypeInformation<List<String>> getResultType(Class<?>[] signature) { return Types.LIST(Types.STRING); } }