filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以
Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid,
null,body_data[2]. field, body_data[2]. field2)] as result
这种写死body_data[X]的sql语句应该不work
在 2023-11-23 15:10:00,"jinzhuguang" <jinzhuguan...@163.com> 写道:
>Flink
>SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
>比如:
>
>SourceT: (
> uuid String,
> body_data ARRAY<ROW<field1 String, field2 String>>
>)
>
>SinkT (
> result ARRAY<ROW< uuid string, body_data String, body_data.fild1
> String, body_data.fild2 String>>
>)
>
>Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid,
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 20:54,casel.chen <casel_c...@126.com> 写道:
>>
>> 输入:
>>
>> {
>>
>> "uuid":"XXXX",
>>
>> "body_data":
>> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>>
>> }
>>
>>
>>
>>
>> 输出:
>>
>> [
>>
>> {
>>
>> "uuid": "XXXX",
>>
>> "body_data: null,
>>
>> "body_data.fild1": "123”,
>>
>> "body_data.fild2": "234"
>>
>> },
>>
>> {
>>
>> "uuid": "XXXX",
>>
>> "body_data": null,
>>
>> "body_data.fild1": "abc",
>>
>> "body_data.fild2": "cdf"
>>
>> }
>>
>> ]
>>
>>
>>
>>
>> 当格式错误时
>>
>>
>>
>>
>> 输入:
>>
>> {
>>
>> "uuid": "XXXX”,
>>
>> "body_data": "abc"
>>
>> }
>>
>> 输出:
>>
>> {
>>
>> "uuid": "XXXX",
>>
>> "body_data": "abc",
>>
>> "body_data.fild1": null,
>>
>> "body_data.fild2": null
>>
>> }