最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导

udfd代码如下:

public class Json2List extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
      .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
      .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;

   public Json2List(){}

   public List<String> eval(String param) {
      List<String> result = new ArrayList<>();
      try {
         List<Map<Object, Object>> list =
OBJECT_MAPPER.readValue(param, List.class);
         for(Map<Object, Object> map : list){
            result.add(OBJECT_MAPPER.writeValueAsString(map));
         }
         return result;
      } catch (JsonProcessingException e){
         LOG.error("failed to convert json to array, param is: {}", param, e);
      }
      return result;
   }


   @Override
   public TypeInformation<List<String>> getResultType(Class<?>[] signature) {
      return Types.LIST(Types.STRING);
   }

}

回复