??????flink-sql??????kafka ??????????????????????

2022-03-22 文章 ??????
----
??: 
   "user-zh"



Re:?????? flink sql????kafka join??????????????????????

2021-04-22 文章 Michael Ran
??
?? 2021-04-22 11:21:55??""  ??
>Tidb??Tidb??TiDBstructured-streaming??
>??
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2021??4??22??(??) ????10:50
>??:"user-zh"
>:Re: flink sqlkafka join??
>
>
>
>??SQLparse json??join
>SQL??join70s=3.8k3
>
>??JOIN??
>TiDB??
>useUnicode=truecharacterEncoding=UTF-8serverTimezone=Asia/ShanghairewriteBatchedStatements=true
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink sql????kafka join??????????????????????

2021-04-21 文章 ????
Tidb??Tidb??TiDBstructured-streaming??
??




----
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-21 文章 Leonard Xu

> 看了下,是1.12才开始支持么,1.11是不行的嘛?
是的,1.11不支持,文档也是有版本的,如果对应版本的文档里没有该功能介绍,那就是不支持的。






Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-21 文章 gimlee
看了下,是1.12才开始支持么,1.11是不行的嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 HunterXHunter
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 Evan
你好,可以获取
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

可以查阅官网得到你想要的信息:  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#available-metadata
 
希望能帮助到你。



 
发件人: gimlee
发送时间: 2021-01-21 11:20
收件人: user-zh
主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
如题,需要获取到kafka的partition、offset进行处理
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 gimlee
如题,需要获取到kafka的partition、offset进行处理



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql kafka connector with avro confluent schema registry support

2020-11-08 文章 Danny Chan
支持的,参考 code https://github.com/apache/flink/pull/12919/commits

陈帅  于2020年11月3日周二 上午8:44写道:

> flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗?
> 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢!
>


?????? Sql??kafka????????????????

2020-07-29 文章 op
?? 1.10??connector type


----
??: 
   "user-zh"



Sql??kafka????????????????

2020-07-28 文章 op
??sql??kafka??1.11??datastream





Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support 
consuming update changes which is produced by node GroupAggregate(groupBy=[dt, 
user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS 
picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS 
share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, 
SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS 
comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, 
MAX(event_time) AS event_time])

Re: flink1.11 sql kafka 抽取事件时间

2020-07-15 文章 Benchao Li
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?

18500348...@163.com <18500348...@163.com> 于2020年7月15日周三 下午3:04写道:

> 大家好!
>
> 使用flink1.11 sql接入kafka ,format为csv
> 从eventTime字段中抽取事件时间
> rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd
> HH:mm:ss'))
> eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true',
> 那么eventTime会被设置为null,此时会报一个异常:
> Caused by: java.lang.RuntimeException: RowTime field should not be null,
> please convert it to a non-null long value.
>
> 有没有什么好的方式可以解决
>
>
> 祝好!
>
>
>
> 18500348...@163.com
>


-- 

Best,
Benchao Li


flink1.11 sql kafka 抽取事件时间

2020-07-15 文章 18500348...@163.com
大家好!

使用flink1.11 sql接入kafka ,format为csv
从eventTime字段中抽取事件时间
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd HH:mm:ss'))
eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 
那么eventTime会被设置为null,此时会报一个异常:
Caused by: java.lang.RuntimeException: RowTime field should not be null, please 
convert it to a non-null long value.

有没有什么好的方式可以解决


祝好!



18500348...@163.com


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-14 文章 Benchao Li
就是现在Flink json已经有了对于VARBINARY类型的处理逻辑,就是string和byte[]互转,然后还需要有base64编码。

但是我们是想让对于VARBINARY的处理逻辑变成另外一种形式,就是把JsonNode直接toString,获取这个json子树的
字符串表示,然后再转成byte[]来作为这个字段。输出的时候,也会直接通过这个byte[]数据来构造一个JsonNode树,
然后放到对应的位置上。也就做到了一个json节点原封不动的保留到了输出里面,不管它是一个什么类型的json节点。

Peihui He  于2020年7月15日周三 上午9:59写道:

> Hi BenChao,
>
> 请问第2个解决思路中 额外加一个选项是指什么呢?
>
> Best wishes.
>
> Benchao Li  于2020年7月10日周五 下午1:54写道:
>
> > Hi Peihui,
> >
> > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
> >
> > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> > 从结果上来看,*还不能完全做到原封不动的输出到下游*。
> >
> > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
> >   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
> >   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
> >
> > Jark Wu  于2020年7月10日周五 下午12:22写道:
> >
> > > 社区有个 issue 正在解决这个问题,可以关注一下
> > > https://issues.apache.org/jira/browse/FLINK-18002
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> > >
> > > > Hi, Peihui
> > > >
> > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > > 的解析的底层实现
> > > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > > >
> > > > 一种做法是定义复杂的jsonObject对应的ROW
> > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > > 然后query里用UDTF处理。
> > > >
> > > >
> > > > 祝好
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > >
> > > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > > >
> > > > > Hello,
> > > > >
> > > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > > >
> > > > > Best wishes.
> > > > >
> > > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > > >>
> > > > >>
> > > > >> Best wishes.
> > > > >>
> > > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > > >>
> > > > >>> Hi Peihui,
> > > > >>>
> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > > >>>
> > > > >>> {
> > > > >>>"a":"b",
> > > > >>>"c":{
> > > > >>>"d":"e",
> > > > >>>"g":"f"
> > > > >>>}
> > > > >>> },
> > > > >>>
> > > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > > >>>
> > > > >>> create table xxx (
> > > > >>> a varchar,
> > > > >>> c row
> > > > >>> )
> > > > >>>
> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > > >>>
> > > > >>> Best,
> > > > >>> LakeShen
> > > > >>>
> > > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > > >>>
> > > >  Hello:
> > > > 
> > > > 在用flink
> > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > > 
> > > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > > 
> > > > 
> > > >  Best wishes.
> > > > 
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-14 文章 Peihui He
Hi BenChao,

请问第2个解决思路中 额外加一个选项是指什么呢?

Best wishes.

Benchao Li  于2020年7月10日周五 下午1:54写道:

> Hi Peihui,
>
> 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
>
> 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> 从结果上来看,*还不能完全做到原封不动的输出到下游*。
>
> 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
>   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
>   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
>
> Jark Wu  于2020年7月10日周五 下午12:22写道:
>
> > 社区有个 issue 正在解决这个问题,可以关注一下
> > https://issues.apache.org/jira/browse/FLINK-18002
> >
> > Best,
> > Jark
> >
> > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> >
> > > Hi, Peihui
> > >
> > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > 的解析的底层实现
> > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > >
> > > 一种做法是定义复杂的jsonObject对应的ROW
> > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > 然后query里用UDTF处理。
> > >
> > >
> > > 祝好
> > > Leonard Xu
> > >
> > >
> > >
> > >
> > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > >
> > > > Hello,
> > > >
> > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > >
> > > > Best wishes.
> > > >
> > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > >
> > > >> Hello,
> > > >>
> > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > >>
> > > >>
> > > >> Best wishes.
> > > >>
> > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > >>
> > > >>> Hi Peihui,
> > > >>>
> > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > >>>
> > > >>> {
> > > >>>"a":"b",
> > > >>>"c":{
> > > >>>"d":"e",
> > > >>>"g":"f"
> > > >>>}
> > > >>> },
> > > >>>
> > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > >>>
> > > >>> create table xxx (
> > > >>> a varchar,
> > > >>> c row
> > > >>> )
> > > >>>
> > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > >>>
> > > >>> Best,
> > > >>> LakeShen
> > > >>>
> > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > >>>
> > >  Hello:
> > > 
> > > 在用flink
> > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > 
> > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > 
> > > 
> > >  Best wishes.
> > > 
> > > >>>
> > > >>
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-10 文章 Peihui He
感谢,已经按后面一种方式做了珞

Leonard Xu  于2020年7月10日周五 上午11:13写道:

> Hi, Peihui
>
> 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
> 就是按照json的标准格式解析(jackson)的,没法将一个
> jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
>
> 一种做法是定义复杂的jsonObject对应的ROW
> 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> 然后query里用UDTF处理。
>
>
> 祝好
> Leonard Xu
>
>
>
>
> > 在 2020年7月10日,10:16,Peihui He  写道:
> >
> > Hello,
> >
> >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> >
> > Best wishes.
> >
> > Peihui He  于2020年7月10日周五 上午10:12写道:
> >
> >> Hello,
> >>
> >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> >>
> >>
> >> Best wishes.
> >>
> >> LakeShen  于2020年7月10日周五 上午10:03写道:
> >>
> >>> Hi Peihui,
> >>>
> >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> >>>
> >>> {
> >>>"a":"b",
> >>>"c":{
> >>>"d":"e",
> >>>"g":"f"
> >>>}
> >>> },
> >>>
> >>> 那么在 kafka table source 可以使用 row 来定义:
> >>>
> >>> create table xxx (
> >>> a varchar,
> >>> c row
> >>> )
> >>>
> >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> >>>
> >>> Best,
> >>> LakeShen
> >>>
> >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> >>>
>  Hello:
> 
> 在用flink
> sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> 
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> 
> 
>  Best wishes.
> 
> >>>
> >>
>
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-10 文章 Jun Zhang
hi,大家好
对于json schema的问题,我想问一个其他的问题,
比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink
sql。

其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的,
某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。

这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢谢。

Benchao Li  于2020年7月10日周五 下午1:54写道:

> Hi Peihui,
>
> 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
>
> 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> 从结果上来看,*还不能完全做到原封不动的输出到下游*。
>
> 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
>   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
>   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
>
> Jark Wu  于2020年7月10日周五 下午12:22写道:
>
> > 社区有个 issue 正在解决这个问题,可以关注一下
> > https://issues.apache.org/jira/browse/FLINK-18002
> >
> > Best,
> > Jark
> >
> > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> >
> > > Hi, Peihui
> > >
> > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > 的解析的底层实现
> > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > >
> > > 一种做法是定义复杂的jsonObject对应的ROW
> > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > 然后query里用UDTF处理。
> > >
> > >
> > > 祝好
> > > Leonard Xu
> > >
> > >
> > >
> > >
> > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > >
> > > > Hello,
> > > >
> > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > >
> > > > Best wishes.
> > > >
> > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > >
> > > >> Hello,
> > > >>
> > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > >>
> > > >>
> > > >> Best wishes.
> > > >>
> > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > >>
> > > >>> Hi Peihui,
> > > >>>
> > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > >>>
> > > >>> {
> > > >>>"a":"b",
> > > >>>"c":{
> > > >>>"d":"e",
> > > >>>"g":"f"
> > > >>>}
> > > >>> },
> > > >>>
> > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > >>>
> > > >>> create table xxx (
> > > >>> a varchar,
> > > >>> c row
> > > >>> )
> > > >>>
> > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > >>>
> > > >>> Best,
> > > >>> LakeShen
> > > >>>
> > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > >>>
> > >  Hello:
> > > 
> > > 在用flink
> > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > 
> > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > 
> > > 
> > >  Best wishes.
> > > 
> > > >>>
> > > >>
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Benchao Li
Hi Peihui,

正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。

当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
从结果上来看,*还不能完全做到原封不动的输出到下游*。

不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
  的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
  序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。

Jark Wu  于2020年7月10日周五 下午12:22写道:

> 社区有个 issue 正在解决这个问题,可以关注一下
> https://issues.apache.org/jira/browse/FLINK-18002
>
> Best,
> Jark
>
> On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
>
> > Hi, Peihui
> >
> > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> 的解析的底层实现
> > 就是按照json的标准格式解析(jackson)的,没法将一个
> > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> >
> > 一种做法是定义复杂的jsonObject对应的ROW
> > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > 然后query里用UDTF处理。
> >
> >
> > 祝好
> > Leonard Xu
> >
> >
> >
> >
> > > 在 2020年7月10日,10:16,Peihui He  写道:
> > >
> > > Hello,
> > >
> > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > >
> > > Best wishes.
> > >
> > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > >
> > >> Hello,
> > >>
> > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > >>
> > >>
> > >> Best wishes.
> > >>
> > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > >>
> > >>> Hi Peihui,
> > >>>
> > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > >>>
> > >>> {
> > >>>"a":"b",
> > >>>"c":{
> > >>>"d":"e",
> > >>>"g":"f"
> > >>>}
> > >>> },
> > >>>
> > >>> 那么在 kafka table source 可以使用 row 来定义:
> > >>>
> > >>> create table xxx (
> > >>> a varchar,
> > >>> c row
> > >>> )
> > >>>
> > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > >>>
> > >>> Best,
> > >>> LakeShen
> > >>>
> > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > >>>
> >  Hello:
> > 
> > 在用flink
> > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > 
> >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > 
> > 
> >  Best wishes.
> > 
> > >>>
> > >>
> >
> >
>


-- 

Best,
Benchao Li


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Jark Wu
社区有个 issue 正在解决这个问题,可以关注一下
https://issues.apache.org/jira/browse/FLINK-18002

Best,
Jark

On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:

> Hi, Peihui
>
> 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
> 就是按照json的标准格式解析(jackson)的,没法将一个
> jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
>
> 一种做法是定义复杂的jsonObject对应的ROW
> 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> 然后query里用UDTF处理。
>
>
> 祝好
> Leonard Xu
>
>
>
>
> > 在 2020年7月10日,10:16,Peihui He  写道:
> >
> > Hello,
> >
> >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> >
> > Best wishes.
> >
> > Peihui He  于2020年7月10日周五 上午10:12写道:
> >
> >> Hello,
> >>
> >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> >>
> >>
> >> Best wishes.
> >>
> >> LakeShen  于2020年7月10日周五 上午10:03写道:
> >>
> >>> Hi Peihui,
> >>>
> >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> >>>
> >>> {
> >>>"a":"b",
> >>>"c":{
> >>>"d":"e",
> >>>"g":"f"
> >>>}
> >>> },
> >>>
> >>> 那么在 kafka table source 可以使用 row 来定义:
> >>>
> >>> create table xxx (
> >>> a varchar,
> >>> c row
> >>> )
> >>>
> >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> >>>
> >>> Best,
> >>> LakeShen
> >>>
> >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> >>>
>  Hello:
> 
> 在用flink
> sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> 
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> 
> 
>  Best wishes.
> 
> >>>
> >>
>
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Leonard Xu
Hi, Peihui

我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
就是按照json的标准格式解析(jackson)的,没法将一个 
jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。

一种做法是定义复杂的jsonObject对应的ROW 
将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, 
然后query里用UDTF处理。


祝好
Leonard Xu




> 在 2020年7月10日,10:16,Peihui He  写道:
> 
> Hello,
> 
>   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> 
> Best wishes.
> 
> Peihui He  于2020年7月10日周五 上午10:12写道:
> 
>> Hello,
>> 
>>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
>> 
>> 
>> Best wishes.
>> 
>> LakeShen  于2020年7月10日周五 上午10:03写道:
>> 
>>> Hi Peihui,
>>> 
>>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
>>> 
>>> {
>>>"a":"b",
>>>"c":{
>>>"d":"e",
>>>"g":"f"
>>>}
>>> },
>>> 
>>> 那么在 kafka table source 可以使用 row 来定义:
>>> 
>>> create table xxx (
>>> a varchar,
>>> c row
>>> )
>>> 
>>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
>>> 
>>> Best,
>>> LakeShen
>>> 
>>> Peihui He  于2020年7月10日周五 上午9:12写道:
>>> 
 Hello:
 
在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
 
 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
 
 
 Best wishes.
 
>>> 
>> 



Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 LakeShen
Hi Peihui,

如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:

{
"a":"b",
"c":{
"d":"e",
"g":"f"
}
},

那么在 kafka table source 可以使用 row 来定义:

create table xxx (
a varchar,
c row
)

如果 还存在嵌套,可以继续再使用 Row 来定义。

Best,
LakeShen

Peihui He  于2020年7月10日周五 上午9:12写道:

> Hello:
>
> 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
>
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
>
>
> Best wishes.
>


flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Peihui He
Hello:

在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。

 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。


Best wishes.


?????? flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op





----
??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.;

 ?? 2020??7??717:01??op <520075...@qq.com ??
 
 hi??
 nbsp; flink sql ??kafka??key
 kafka connectorkey??
 
 nbsp;

flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op
hi??
 flink sql ??kafka??key
kafka connectorkey??



Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 文章 Jark Wu
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen  wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>...
> 'format.type' = 'json',
> 'format.property-version' = '1',
> 'format.derive-schema' = 'true',
> 'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>


Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 文章 LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.