Hi! UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
[1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc casel.chen <[email protected]> 于2021年12月1日周三 上午7:56写道: > 业务中使用flink sql group by操作后想收集所有的数据,如下示例: > > > kafka源表: > 班级 学号 姓名 年龄 > 1 20001 张三 15 > 2 20011 李四 16 > 1 20002 王五 16 > 2 20012 吴六 15 > > > create table source_table ( > class_no: INT, > student_no: INT, > name: STRING, > age: INT > ) with ( > 'connector' = 'kafka', > ... > ); > > > mongodb目标表: > 班级 学生信息 > 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": > 20002, "name":"王五", "age": 16}] > 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": > 20012, "name":"吴六", "age": 15}] > > > create table sink_table ( > class_no INT, > students: ARRAY<ROW<student_no STRING, name STRING, age INT>> > ) with ( > 'connector' = 'mongodb', > ... > ); > > > 查了下flink自带的系统函数,接近满足条件的只有collect函数。 > insert into sink_table select class_no, collect(ROW(student_no, name, age) > from source_table group by class_no; > > > 但它返回的是Multiset类型,即Map<?, > Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。 > 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。 > > > 1. > 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子? > 2. 如果要收集不去重的Array[ROW],又该怎么写? > 3. 访问一个数据类型为Map<kt, vt>的数据中key和value,分别要用什么flink sql语法? > > > 谢谢解答! > > > > > > > > > > > >
