Hi, If the schema of records is not fixed I’m afraid you have to do it in UDTF.
Best, Qingsheng > On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote: > > Hi, > > Thanks for your quick response! > > And I tried the format "raw", seems it only support single physical column, > and in our project reqiurement, there are more than one hundred columns in > sink table. So I need combine those columns into one string in a single UDF? > > Thanks && Regards, > Hunk > > > > > > > > At 2022-04-02 15:17:14, "Qingsheng Ren" <renqs...@gmail.com> wrote: > >Hi, > > > >You can construct the final json string in your UDTF, and write it to Kafka > >sink table with only one field, which is the entire json string constructed > >in UDTF, and use raw format [1] in the sink table: > > > >CREATE TABLE TableSink ( > > `final_json_string` STRING > >) WITH ( > > ‘connector’ = ‘kafka’, > > ‘format’ = ‘raw’, > > ... > >) > > > >So the entire flow would be like: > > > >1. Kafka source table reads data > >2. UDTF parses the ‘content’ field, and construct the final json (id, > >content without backslash) string you need, maybe using Jackson [2] or other > >json tools > >3. Insert the constructed json string as the only field in sink table > > > >The key problem is that the schema of field “content” is not fixed, so you > >have to complete most logics in UDTF. > > > >[1] > >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/ > >[2] https://github.com/FasterXML/jackson > > > >Best regards, > > > >Qingsheng > > > > > >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote: > >> > >> Hi, > >> > >> Thanks so much for your support! > >> > >> But sorry to say I'm still confused about it. No matter what the udf looks > >> like, the first thing I need confirm is the type of 'content' in > >> TableSink, what's the type of it should be, should I use type Row, like > >> this? > >> > >> CREATE TABLE TableSink ( > >> `id` STRING NOT NULL, > >> `content` ROW<name STRING, age BIGINT> > >> ) > >> WITH ( > >> ... > >> ); > >> > >> This type is only suitable for source input {"schema": "schema_infos", > >> "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}} > >> > >> But the json key name and format of 'content' in source is variable, if > >> the source input is > >> {"schema": "schema_infos", "payload": {"id": "10000", "content": > >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} > >> > >> I should define `content` in TableSink with type `content` ROW<color > >> STRING, BackgroundColor STRING, Height BIGINT>, like this: > >> > >> CREATE TABLE TableSink ( > >> `id` STRING NOT NULL, > >> `content` ROW<color STRING, BackgroundColor STRING, Height > >> BIGINT> > >> ) > >> WITH ( > >> ... > >> ); > >> > >> And in input json also might contains json array, like: > >> {"schema": "schema_infos", "payload": {"id": "10000", "content": > >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}} > >> > >> > >> So is there some common type I can use which can handle all input json > >> formats? > >> > >> Thanks so much!! > >> > >> > >> Thanks && Regards, > >> Hunk > >> > >> > >> > >> > >> > >> > >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com > >> > wrote: > >> >Hi, > >> > > >> >I’m afraid you have to use a UDTF to parse the content and construct the > >> >final json string manually. The key problem is that the field “content” > >> >is actually a STRING, although it looks like a json object. Currently the > >> >json format provided by Flink could not handle this kind of field defined > >> >as STRING. Also considering the schema of this “content” field is not > >> >fixed across records, Flink SQL can’t use one DDL to consume / produce > >> >records with changing schema. > >> > > >> >Cheers, > >> > > >> >Qingsheng > >> > > >> >> On Mar 31, 2022, at 21:42, wang < > >> 24248...@163.com > >> > wrote: > >> >> > >> >> Hi dear engineer, > >> >> > >> >> Thanks so much for your precious time reading my email! > >> >> Resently I'm working on the Flink sql (with version 1.13) in my project > >> >> and encountered one problem about json format data, hope you can take a > >> >> look, thanks! Below is the description of my issue. > >> >> > >> >> I use kafka as source and sink, I define kafka source table like this: > >> >> > >> >> CREATE TABLE TableSource ( > >> >> schema STRING, > >> >> payload ROW( > >> >> `id` STRING, > >> >> `content` STRING > >> >> ) > >> >> ) > >> >> WITH ( > >> >> 'connector' = 'kafka', > >> >> 'topic' = 'topic_source', > >> >> 'properties.bootstrap.servers' = 'localhost:9092', > >> >> ' > >> properties.group.id > >> ' = 'all_gp', > >> >> 'scan.startup.mode' = 'group-offsets', > >> >> 'format' = 'json', > >> >> 'json.fail-on-missing-field' = 'false', > >> >> 'json.ignore-parse-errors' = 'true' > >> >> ); > >> >> > >> >> Define the kafka sink table like this: > >> >> > >> >> CREATE TABLE TableSink ( > >> >> `id` STRING NOT NULL, > >> >> `content` STRING NOT NULL > >> >> ) > >> >> WITH ( > >> >> 'connector' = 'kafka', > >> >> 'topic' = 'topic_sink', > >> >> 'properties.bootstrap.servers' = 'localhost:9092', > >> >> 'format' = 'json', > >> >> 'json.fail-on-missing-field' = 'false', > >> >> 'json.ignore-parse-errors' = 'true' > >> >> ); > >> >> > >> >> > >> >> Then insert into TableSink with data from TableSource: > >> >> INSERT INTO TableSink SELECT id, content FROM TableSource; > >> >> > >> >> Then I use "kafka-console-producer.sh" to produce data below into topic > >> >> "topic_source" (TableSource): > >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": > >> >> "{\"name\":\"Jone\",\"age\":20}"}} > >> >> > >> >> > >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, > >> >> the output is: > >> >> {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"} > >> >> > >> >> But what I want here is {"id":"10000","content": > >> >> {"name":"Jone","age":20}} > >> >> I want the the value of "content" is json object, not json string. > >> >> > >> >> And what's more, the format of "content" in TableSource is not fixed, > >> >> it can be any json formated(or json array format) string, such as: > >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": > >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} > >> >> > >> >> > >> >> So my question is, how can I transform json format string(like > >> >> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object > >> >> (like{"name":"Jone","age":20} ). > >> >> > >> >> > >> >> Thanks && Regards, > >> >> Hunk > >> >> > >> >> > >> >> > >> >> > >> >> > >> > > > >