Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final 
json string manually. The key problem is that the field “content” is actually a 
STRING, although it looks like a json object. Currently the json format 
provided by Flink could not handle this kind of field defined as STRING. Also 
considering the schema of this “content” field is not fixed across records, 
Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and 
> encountered one problem about json format data, hope you can take a look, 
> thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>      CREATE TABLE TableSource (
>           schema STRING,
>           payload ROW(
>               `id` STRING,
>               `content` STRING
>          )
>      )
>      WITH (
>          'connector' = 'kafka',
>          'topic' = 'topic_source',
>          'properties.bootstrap.servers' = 'localhost:9092',
>          'properties.group.id' = 'all_gp',
>          'scan.startup.mode' = 'group-offsets',
>          'format' = 'json',
>          'json.fail-on-missing-field' = 'false',
>          'json.ignore-parse-errors' = 'true'
>      );
> 
> Define the kafka sink table like this:
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` STRING NOT NULL
>      )
>      WITH (
>          'connector' = 'kafka',
>          'topic' = 'topic_sink',
>          'properties.bootstrap.servers' = 'localhost:9092',
>          'format' = 'json',
>          'json.fail-on-missing-field' = 'false',
>          'json.ignore-parse-errors' = 'true'
>     );
> 
> 
> Then insert into TableSink with data from TableSource:
>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic 
> "topic_source" (TableSource):
>     {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> output is:
>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can 
> be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like 
> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>  

Reply via email to