Re:Re:Re: Could you please give me a hand about json object in flink sql

2022-04-03 Thread lixiongfeng
May be you can get  some inspiration  from JsonDeserializationSchema an 
JSONKeyValueDeserializationSchema.













At 2022-04-02 14:47:08, "wang" <24248...@163.com> wrote:

Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW, like this:




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren"  wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>  CREATE TABLE TableSource (
>>   schema STRING,
>>   payload ROW(
>>   `id` STRING,
>>   `content` STRING
>>  )
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_source',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'all_gp',
>>  'scan.startup.mode' = 'group-offsets',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>>  );
>> 
>> Define the kafka sink table like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` STRING NOT NULL
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_sink',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>> );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  





 

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Got it, seems this way is not flexable enough, but still thanks so much for 
your great support!  Good wished!




Regards && Thanks
Hunk








At 2022-04-02 16:34:29, "Qingsheng Ren"  wrote:
>Hi,
>
>If the schema of records is not fixed I’m afraid you have to do it in UDTF. 
>
>Best, 
>
>Qingsheng
>
>> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks for your quick response! 
>> 
>> And I tried the format "raw", seems it only support single physical column, 
>> and in our project reqiurement, there are more than one hundred columns in 
>> sink table. So I need combine those columns into one string in a single UDF?
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>> >Hi,
>> >
>> >You can construct the final json string in your UDTF, and write it to Kafka 
>> >sink table with only one field, which is the entire json string constructed 
>> >in UDTF, and use raw format [1] in the sink table:
>> >
>> >CREATE TABLE TableSink (
>> >`final_json_string` STRING
>> >) WITH (
>> >‘connector’ = ‘kafka’,
>> >‘format’ = ‘raw’,
>> >...
>> >)
>> >
>> >So the entire flow would be like:
>> >
>> >1. Kafka source table reads data
>> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
>> >content without backslash) string you need, maybe using Jackson [2] or 
>> >other json tools
>> >3. Insert the constructed json string as the only field in sink table
>> >
>> >The key problem is that the schema of field “content” is not fixed, so you 
>> >have to complete most logics in UDTF. 
>> >
>> >[1] 
>> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>> >[2] https://github.com/FasterXML/jackson
>> >
>> >Best regards, 
>> >
>> >Qingsheng
>> >
>> >
>> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> >> 
>> >> Hi,
>> >> 
>> >> Thanks so much for your support! 
>> >> 
>> >> But sorry to say I'm still confused about it. No matter what the udf 
>> >> looks like, the first thing I need confirm is the type of 'content' in 
>> >> TableSink, what's the type of it should be, should I use type Row, like 
>> >> this?
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> This type is only suitable for source input {"schema": "schema_infos", 
>> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> 
>> >> But the json key name and format of 'content' in source is variable, if 
>> >> the source input is 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> 
>> >> I should define `content` in TableSink with type `content` ROW> >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW> >> BIGINT>
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> And in input json also might contains json array, like: 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> >> 
>> >> 
>> >> So is there some common type I can use which can handle all input json 
>> >> formats?  
>> >> 
>> >> Thanks so much!!
>> >> 
>> >> 
>> >> Thanks && Regards,
>> >> Hunk
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> At 2022-04-01 17:25:59, "Qingsheng Ren" > >> > wrote:
>> >> >Hi, 
>> >> >
>> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >> >final json string manually. The key problem is that the field “content” 
>> >> >is actually a STRING, although it looks like a json object. Currently 
>> >> >the json format provided by Flink could not handle this kind of field 
>> >> >defined as STRING. Also considering the schema of this “content” field 
>> >> >is not fixed across records, Flink SQL can’t use one DDL to consume / 
>> >> >produce records with changing schema. 
>> >> >
>> >> >Cheers,
>> >> >
>> >> >Qingsheng
>> >> >
>> >> >> On Mar 31, 2022, at 21:42, wang <
>> >> 24248...@163.com
>> >> > wrote:
>> >> >> 
>> >> >> Hi dear engineer,
>> >> >> 
>> >> >> Thanks so much for your precious time reading my email!
>> >> >> Resently I'm working on the Flink sql (with version 1.13) in my 
>> >> >> project and encountered one problem about json format data, hope you 
>> >> >> can take a look, thanks! Below is the description of my issue.
>> >> >> 
>> >> >> I use kafka as source and sink, I define kafka source table like this:
>> >> >> 
>> >> >>  CREATE TABLE TableSource (
>> >> >>   schema STRING,
>> >> >>   payload ROW(
>> >> >>   `id` STRING,
>> 

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, 
> and in our project reqiurement, there are more than one hundred columns in 
> sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka 
> >sink table with only one field, which is the entire json string constructed 
> >in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >`final_json_string` STRING
> >) WITH (
> >‘connector’ = ‘kafka’,
> >‘format’ = ‘raw’,
> >...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
> >content without backslash) string you need, maybe using Jackson [2] or other 
> >json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you 
> >have to complete most logics in UDTF. 
> >
> >[1] 
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks 
> >> like, the first thing I need confirm is the type of 'content' in 
> >> TableSink, what's the type of it should be, should I use type Row, like 
> >> this?
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", 
> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if 
> >> the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW >> BIGINT>
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json 
> >> formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren"  >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >> >final json string manually. The key problem is that the field “content” 
> >> >is actually a STRING, although it looks like a json object. Currently the 
> >> >json format provided by Flink could not handle this kind of field defined 
> >> >as STRING. Also considering the schema of this “content” field is not 
> >> >fixed across records, Flink SQL can’t use one DDL to consume / produce 
> >> >records with changing schema. 
> >> >
> >> >Cheers,
> >> >
> >> >Qingsheng
> >> >
> >> >> On Mar 31, 2022, at 21:42, wang <
> >> 24248...@163.com
> >> > wrote:
> >> >> 
> >> >> Hi dear engineer,
> >> >> 
> >> >> Thanks so much for your precious time reading my email!
> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> >> and encountered one problem about json format data, hope you can take a 
> >> >> look, thanks! Below is the description of my issue.
> >> >> 
> >> >> I use kafka as source and sink, I define kafka source table like this:
> >> >> 
> >> >>  CREATE TABLE TableSource (
> >> >>   schema STRING,
> >> >>   payload ROW(
> >> >>   `id` STRING,
> >> >>   `content` STRING
> >> >>  )
> >> >>  )
> >> >>  WITH (
> >> >>  'connector' = 'kafka',
> >> >>  'topic' = 'topic_source',
> >> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >> >>  '
> >> properties.group.id
> >> ' = 'all_gp',
> >> >>  'scan.startup.mode' = 

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Thanks for your quick response! 


And I tried the format "raw", seems it only support single physical column, and 
in our project reqiurement, there are more than one hundred columns in sink 
table. So I need combine those columns into one string in a single UDF?


Thanks && Regards,
Hunk

















At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>Hi,
>
>You can construct the final json string in your UDTF, and write it to Kafka 
>sink table with only one field, which is the entire json string constructed in 
>UDTF, and use raw format [1] in the sink table:
>
>CREATE TABLE TableSink (
>`final_json_string` STRING
>) WITH (
>‘connector’ = ‘kafka’,
>‘format’ = ‘raw’,
>...
>)
>
>So the entire flow would be like:
>
>1. Kafka source table reads data
>2. UDTF parses the ‘content’ field, and construct the final json (id, content 
>without backslash) string you need, maybe using Jackson [2] or other json tools
>3. Insert the constructed json string as the only field in sink table
>
>The key problem is that the schema of field “content” is not fixed, so you 
>have to complete most logics in UDTF. 
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>[2] https://github.com/FasterXML/jackson
>
>Best regards, 
>
>Qingsheng
>
>
>> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks so much for your support! 
>> 
>> But sorry to say I'm still confused about it. No matter what the udf looks 
>> like, the first thing I need confirm is the type of 'content' in TableSink, 
>> what's the type of it should be, should I use type Row, like this?
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> This type is only suitable for source input {"schema": "schema_infos", 
>> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> But the json key name and format of 'content' in source is variable, if the 
>> source input is 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> I should define `content` in TableSink with type `content` ROW> BackgroundColor STRING, Height BIGINT>, like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> And in input json also might contains json array, like: 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> 
>> 
>> So is there some common type I can use which can handle all input json 
>> formats?  
>> 
>> Thanks so much!!
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-01 17:25:59, "Qingsheng Ren" > > wrote:
>> >Hi, 
>> >
>> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >final json string manually. The key problem is that the field “content” is 
>> >actually a STRING, although it looks like a json object. Currently the json 
>> >format provided by Flink could not handle this kind of field defined as 
>> >STRING. Also considering the schema of this “content” field is not fixed 
>> >across records, Flink SQL can’t use one DDL to consume / produce records 
>> >with changing schema. 
>> >
>> >Cheers,
>> >
>> >Qingsheng
>> >
>> >> On Mar 31, 2022, at 21:42, wang <
>> 24248...@163.com
>> > wrote:
>> >> 
>> >> Hi dear engineer,
>> >> 
>> >> Thanks so much for your precious time reading my email!
>> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
>> >> and encountered one problem about json format data, hope you can take a 
>> >> look, thanks! Below is the description of my issue.
>> >> 
>> >> I use kafka as source and sink, I define kafka source table like this:
>> >> 
>> >>  CREATE TABLE TableSource (
>> >>   schema STRING,
>> >>   payload ROW(
>> >>   `id` STRING,
>> >>   `content` STRING
>> >>  )
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_source',
>> >>  'properties.bootstrap.servers' = 'localhost:9092',
>> >>  '
>> properties.group.id
>> ' = 'all_gp',
>> >>  'scan.startup.mode' = 'group-offsets',
>> >>  'format' = 'json',
>> >>  'json.fail-on-missing-field' = 'false',
>> >>  'json.ignore-parse-errors' = 'true'
>> >>  );
>> >> 
>> >> Define the kafka sink table like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` STRING NOT NULL
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_sink',
>> >>  

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

You can construct the final json string in your UDTF, and write it to Kafka 
sink table with only one field, which is the entire json string constructed in 
UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
`final_json_string` STRING
) WITH (
‘connector’ = ‘kafka’,
‘format’ = ‘raw’,
...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content 
without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have 
to complete most logics in UDTF. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks 
> like, the first thing I need confirm is the type of 'content' in TableSink, 
> what's the type of it should be, should I use type Row, like this?
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> This type is only suitable for source input {"schema": "schema_infos", 
> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the 
> source input is 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW BackgroundColor STRING, Height BIGINT>, like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json 
> formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren"  > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >final json string manually. The key problem is that the field “content” is 
> >actually a STRING, although it looks like a json object. Currently the json 
> >format provided by Flink could not handle this kind of field defined as 
> >STRING. Also considering the schema of this “content” field is not fixed 
> >across records, Flink SQL can’t use one DDL to consume / produce records 
> >with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248...@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> and encountered one problem about json format data, hope you can take a 
> >> look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>  CREATE TABLE TableSource (
> >>   schema STRING,
> >>   payload ROW(
> >>   `id` STRING,
> >>   `content` STRING
> >>  )
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_source',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  '
> properties.group.id
> ' = 'all_gp',
> >>  'scan.startup.mode' = 'group-offsets',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >>  );
> >> 
> >> Define the kafka sink table like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` STRING NOT NULL
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_sink',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >> );
> >> 
> >> 
> >> Then insert into TableSink with data from TableSource:
> >> INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> 
> >> Then I use "kafka-console-producer.sh" to produce data below into topic 
> >> "topic_source" (TableSource):
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW, like this:




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren"  wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>  CREATE TABLE TableSource (
>>   schema STRING,
>>   payload ROW(
>>   `id` STRING,
>>   `content` STRING
>>  )
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_source',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'all_gp',
>>  'scan.startup.mode' = 'group-offsets',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>>  );
>> 
>> Define the kafka sink table like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` STRING NOT NULL
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_sink',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>> );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  


Re: Could you please give me a hand about json object in flink sql

2022-04-01 Thread Qingsheng Ren
Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final 
json string manually. The key problem is that the field “content” is actually a 
STRING, although it looks like a json object. Currently the json format 
provided by Flink could not handle this kind of field defined as STRING. Also 
considering the schema of this “content” field is not fixed across records, 
Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and 
> encountered one problem about json format data, hope you can take a look, 
> thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>  CREATE TABLE TableSource (
>   schema STRING,
>   payload ROW(
>   `id` STRING,
>   `content` STRING
>  )
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_source',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'all_gp',
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
>  );
> 
> Define the kafka sink table like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` STRING NOT NULL
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_sink',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
> );
> 
> 
> Then insert into TableSink with data from TableSource:
> INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic 
> "topic_source" (TableSource):
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> output is:
> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can 
> be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like 
> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>  



Could you please give me a hand about json object in flink sql

2022-03-31 Thread wang
Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


 CREATE TABLE TableSource (
  schema STRING,
  payload ROW(
  `id` STRING,
  `content` STRING
 )
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_source',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'all_gp',
 'scan.startup.mode' = 'group-offsets',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
 );


Define the kafka sink table like this:


 CREATE TABLE TableSink (
  `id` STRING NOT NULL,
  `content` STRING NOT NULL
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_sink',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);




Then insert into TableSink with data from TableSource:
INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
{"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk



Could you please give me a hand about json object in flink sql

2022-03-31 Thread wang
Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


 CREATE TABLE TableSource (
  schema STRING,
  payload ROW(
  `id` STRING,
  `content` STRING
 )
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_source',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'all_gp',
 'scan.startup.mode' = 'group-offsets',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
 );


Define the kafka sink table like this:


 CREATE TABLE TableSink (
  `id` STRING NOT NULL,
  `content` STRING NOT NULL
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_sink',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);




Then insert into TableSink with data from TableSource:
INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
{"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk