业务中使用flink sql group by操作后想收集所有的数据,如下示例:
kafka源表:
班级 学号 姓名 年龄
1 20001 张三 15
2 20011 李四 16
1 20002 王五 16
2 20012 吴六 15
create table source_table (
class_no: INT,
student_no: INT,
name: STRING,
age: INT
) with (
'connector' = 'kafka',
...
);
mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002,
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012,
"name":"吴六", "age": 15}]
create table sink_table (
class_no INT,
students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
) with (
'connector' = 'mongodb',
...
);
查了下flink自带的系统函数,接近满足条件的只有collect函数。
insert into sink_table select class_no, collect(ROW(student_no, name, age) from
source_table group by class_no;
但它返回的是Multiset类型,即Map<?,
Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
2. 如果要收集不去重的Array[ROW],又该怎么写?
3. 访问一个数据类型为Map<kt, vt>的数据中key和value,分别要用什么flink sql语法?
谢谢解答!