Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, 
> and in our project reqiurement, there are more than one hundred columns in 
> sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren" <renqs...@gmail.com> wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka 
> >sink table with only one field, which is the entire json string constructed 
> >in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >    `final_json_string` STRING
> >) WITH (
> >    ‘connector’ = ‘kafka’,
> >    ‘format’ = ‘raw’,
> >    ...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
> >content without backslash) string you need, maybe using Jackson [2] or other 
> >json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you 
> >have to complete most logics in UDTF. 
> >
> >[1] 
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks 
> >> like, the first thing I need confirm is the type of 'content' in 
> >> TableSink, what's the type of it should be, should I use type Row, like 
> >> this?
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` ROW<name STRING, age BIGINT>
> >>      )
> >>      WITH (
> >>          ...
> >>     );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", 
> >> "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if 
> >> the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW<color 
> >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` ROW<color STRING, BackgroundColor STRING, Height 
> >> BIGINT>
> >>      )
> >>      WITH (
> >>          ...
> >>     );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json 
> >> formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com
> >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >> >final json string manually. The key problem is that the field “content” 
> >> >is actually a STRING, although it looks like a json object. Currently the 
> >> >json format provided by Flink could not handle this kind of field defined 
> >> >as STRING. Also considering the schema of this “content” field is not 
> >> >fixed across records, Flink SQL can’t use one DDL to consume / produce 
> >> >records with changing schema. 
> >> >
> >> >Cheers,
> >> >
> >> >Qingsheng
> >> >
> >> >> On Mar 31, 2022, at 21:42, wang <
> >> 24248...@163.com
> >> > wrote:
> >> >> 
> >> >> Hi dear engineer,
> >> >> 
> >> >> Thanks so much for your precious time reading my email!
> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> >> and encountered one problem about json format data, hope you can take a 
> >> >> look, thanks! Below is the description of my issue.
> >> >> 
> >> >> I use kafka as source and sink, I define kafka source table like this:
> >> >> 
> >> >>      CREATE TABLE TableSource (
> >> >>           schema STRING,
> >> >>           payload ROW(
> >> >>               `id` STRING,
> >> >>               `content` STRING
> >> >>          )
> >> >>      )
> >> >>      WITH (
> >> >>          'connector' = 'kafka',
> >> >>          'topic' = 'topic_source',
> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >> >>          '
> >> properties.group.id
> >> ' = 'all_gp',
> >> >>          'scan.startup.mode' = 'group-offsets',
> >> >>          'format' = 'json',
> >> >>          'json.fail-on-missing-field' = 'false',
> >> >>          'json.ignore-parse-errors' = 'true'
> >> >>      );
> >> >> 
> >> >> Define the kafka sink table like this:
> >> >> 
> >> >>      CREATE TABLE TableSink (
> >> >>           `id` STRING NOT NULL,
> >> >>           `content` STRING NOT NULL
> >> >>      )
> >> >>      WITH (
> >> >>          'connector' = 'kafka',
> >> >>          'topic' = 'topic_sink',
> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >> >>          'format' = 'json',
> >> >>          'json.fail-on-missing-field' = 'false',
> >> >>          'json.ignore-parse-errors' = 'true'
> >> >>     );
> >> >> 
> >> >> 
> >> >> Then insert into TableSink with data from TableSource:
> >> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> >> 
> >> >> Then I use "kafka-console-producer.sh" to produce data below into topic 
> >> >> "topic_source" (TableSource):
> >> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> >> "{\"name\":\"Jone\",\"age\":20}"}}
> >> >> 
> >> >> 
> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, 
> >> >> the output is:
> >> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> >> >> 
> >> >> But what I want here is {"id":"10000","content": 
> >> >> {"name":"Jone","age":20}}
> >> >> I want the the value of "content" is json object, not json string.
> >> >> 
> >> >> And what's more, the format of "content" in TableSource is not fixed, 
> >> >> it can be any json formated(or json array format) string, such as:
> >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> >> 
> >> >> 
> >> >> So my question is, how can I transform json format string(like 
> >> >> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> >> >> (like{"name":"Jone","age":20} ).
> >> >> 
> >> >> 
> >> >> Thanks && Regards,
> >> >> Hunk
> >> >> 
> >> >> 
> >> >> 
> >> >> 
> >> >>  
> >> 
> 
> 
> 
>  

Reply via email to