Hi!

UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc

casel.chen <[email protected]> 于2021年12月1日周三 上午7:56写道:

> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>
>
> kafka源表:
> 班级     学号      姓名      年龄
> 1         20001    张三       15
> 2         20011    李四       16
> 1         20002    王五       16
> 2         20012    吴六       15
>
>
> create table source_table (
>    class_no: INT,
>    student_no: INT,
>    name: STRING,
>    age: INT
> ) with (
>    'connector' = 'kafka',
>    ...
> );
>
>
> mongodb目标表:
> 班级     学生信息
> 1         [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
> 20002, "name":"王五", "age": 16}]
> 2         [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
> 20012, "name":"吴六", "age": 15}]
>
>
> create table sink_table (
>   class_no INT,
>   students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
> ) with (
>   'connector' = 'mongodb',
>   ...
> );
>
>
> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
> insert into sink_table select class_no, collect(ROW(student_no, name, age)
> from source_table group by class_no;
>
>
> 但它返回的是Multiset类型,即Map<?,
> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>
>
> 1.
> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
> 2. 如果要收集不去重的Array[ROW],又该怎么写?
> 3. 访问一个数据类型为Map<kt, vt>的数据中key和value,分别要用什么flink sql语法?
>
>
> 谢谢解答!
>
>
>
>
>
>
>
>
>
>
>
>

回复