Hi,

You can construct the final json string in your UDTF, and write it to Kafka 
sink table with only one field, which is the entire json string constructed in 
UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
    `final_json_string` STRING
) WITH (
    ‘connector’ = ‘kafka’,
    ‘format’ = ‘raw’,
    ...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content 
without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have 
to complete most logics in UDTF. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks 
> like, the first thing I need confirm is the type of 'content' in TableSink, 
> what's the type of it should be, should I use type Row, like this?
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` ROW<name STRING, age BIGINT>
>      )
>      WITH (
>          ...
>     );
> 
> This type is only suitable for source input {"schema": "schema_infos", 
> "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the 
> source input is 
> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW<color STRING, 
> BackgroundColor STRING, Height BIGINT>, like this:
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
>      )
>      WITH (
>          ...
>     );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json 
> formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com
> > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >final json string manually. The key problem is that the field “content” is 
> >actually a STRING, although it looks like a json object. Currently the json 
> >format provided by Flink could not handle this kind of field defined as 
> >STRING. Also considering the schema of this “content” field is not fixed 
> >across records, Flink SQL can’t use one DDL to consume / produce records 
> >with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248...@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> and encountered one problem about json format data, hope you can take a 
> >> look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>      CREATE TABLE TableSource (
> >>           schema STRING,
> >>           payload ROW(
> >>               `id` STRING,
> >>               `content` STRING
> >>          )
> >>      )
> >>      WITH (
> >>          'connector' = 'kafka',
> >>          'topic' = 'topic_source',
> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >>          '
> properties.group.id
> ' = 'all_gp',
> >>          'scan.startup.mode' = 'group-offsets',
> >>          'format' = 'json',
> >>          'json.fail-on-missing-field' = 'false',
> >>          'json.ignore-parse-errors' = 'true'
> >>      );
> >> 
> >> Define the kafka sink table like this:
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` STRING NOT NULL
> >>      )
> >>      WITH (
> >>          'connector' = 'kafka',
> >>          'topic' = 'topic_sink',
> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >>          'format' = 'json',
> >>          'json.fail-on-missing-field' = 'false',
> >>          'json.ignore-parse-errors' = 'true'
> >>     );
> >> 
> >> 
> >> Then insert into TableSink with data from TableSource:
> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> 
> >> Then I use "kafka-console-producer.sh" to produce data below into topic 
> >> "topic_source" (TableSource):
> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> 
> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> >> output is:
> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> >> 
> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
> >> I want the the value of "content" is json object, not json string.
> >> 
> >> And what's more, the format of "content" in TableSource is not fixed, it 
> >> can be any json formated(or json array format) string, such as:
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> 
> >> So my question is, how can I transform json format string(like 
> >> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> >> (like{"name":"Jone","age":20} ).
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >>  
> 

Reply via email to