filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以

Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result




这种写死body_data[X]的sql语句应该不work








在 2023-11-23 15:10:00,"jinzhuguang" <jinzhuguan...@163.com> 写道:
>Flink 
>SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
>比如:
>
>SourceT: (
>       uuid String,
>       body_data ARRAY<ROW<field1 String, field2 String>>
>)
>
>SinkT (
>       result ARRAY<ROW< uuid  string, body_data  String, body_data.fild1  
> String, body_data.fild2  String>>
>)
>
>Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 20:54,casel.chen <casel_c...@126.com> 写道:
>> 
>> 输入:
>> 
>> {
>> 
>>  "uuid":"XXXX",
>> 
>>  "body_data": 
>> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>> 
>> }
>> 
>> 
>> 
>> 
>> 输出:
>> 
>> [
>> 
>>  {
>> 
>> "uuid": "XXXX",
>> 
>> "body_data: null,
>> 
>> "body_data.fild1": "123”,
>> 
>> "body_data.fild2": "234"
>> 
>>  },
>> 
>>  {
>> 
>> "uuid": "XXXX",
>> 
>> "body_data": null,
>> 
>> "body_data.fild1": "abc",
>> 
>> "body_data.fild2": "cdf"
>> 
>>  }
>> 
>> ]
>> 
>> 
>> 
>> 
>> 当格式错误时
>> 
>> 
>> 
>> 
>> 输入:
>> 
>> {
>> 
>> "uuid": "XXXX”,
>> 
>> "body_data": "abc"
>> 
>> }
>> 
>> 输出:
>> 
>> {
>> 
>> "uuid": "XXXX",
>> 
>> "body_data": "abc",
>> 
>> "body_data.fild1": null,
>> 
>> "body_data.fild2": null
>> 
>> }

回复