Hi Zain, I assume you are using DataStream API as described in the subject of your email, so I think you can define any functions/transformations to parse the json value, even the schema is changing.
It looks like the value of field “array_coordinates” is a an escaped json-formatted STRING instead of an json object, so I prefer to parse the input json string first using Jackson (or any json parser you like), extract the field “array_coordinates” as a string, remove all backslashs to un-escape the string, and use Jackson again to parse it. If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the input because the schema varies in the field “array_coordinates”. Hope this could be helpful! Cheers, Qingsheng > On May 21, 2022, at 14:58, Zain Haider Nemati <zain.hai...@retailo.co> wrote: > > Hi Folks, > I have data coming in this format: > > { > “data”: { > “oid__id”: “61de4f26f01131783f162453”, > “array_coordinates”: “[ { \“speed\” : \“xxx\“, \“accuracy\” : > \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : > \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { > \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : > \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : > \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : > \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“, > “batchId”: “xxx", > “agentId”: “xxx", > “routeKey”: “40042-12-01-2022", > “__v”: 0 > }, > “metadata”: { > “timestamp”: “2022-05-02T18:49:52.619827Z”, > “record-type”: “data”, > “operation”: “load”, > “partition-key-type”: “primary-key”, > “schema-name”: “xxx”, > “table-name”: “xxx” > } > } > > Where length of array coordinates array varies is not fixed in the source is > their any way to define a json deserializer for this? If so would really > appreciate if I can get some help on this