??????flink-sql??????kafka ??????????????????????
---- ??: "user-zh"
Re:?????? flink sql????kafka join??????????????????????
?? ?? 2021-04-22 11:21:55??"" ?? >Tidb??Tidb??TiDBstructured-streaming?? >?? > > > > >---- >??: >"user-zh" > >:2021??4??22??(??) ????10:50 >??:"user-zh" >:Re: flink sqlkafka join?? > > > >??SQLparse json??join >SQL??join70s=3.8k3 > >??JOIN?? >TiDB?? >useUnicode=truecharacterEncoding=UTF-8serverTimezone=Asia/ShanghairewriteBatchedStatements=true > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
?????? flink sql????kafka join??????????????????????
Tidb??Tidb??TiDBstructured-streaming?? ?? ---- ??: "user-zh" http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
> 看了下,是1.12才开始支持么,1.11是不行的嘛? 是的,1.11不支持,文档也是有版本的,如果对应版本的文档里没有该功能介绍,那就是不支持的。
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
看了下,是1.12才开始支持么,1.11是不行的嘛? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
你好,可以获取 CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); 可以查阅官网得到你想要的信息: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#available-metadata 希望能帮助到你。 发件人: gimlee 发送时间: 2021-01-21 11:20 收件人: user-zh 主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛? 如题,需要获取到kafka的partition、offset进行处理 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink SQL kafka connector有办法获取到partition、offset信息嘛?
如题,需要获取到kafka的partition、offset进行处理 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql kafka connector with avro confluent schema registry support
支持的,参考 code https://github.com/apache/flink/pull/12919/commits 陈帅 于2020年11月3日周二 上午8:44写道: > flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗? > 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢! >
?????? Sql??kafka????????????????
?? 1.10??connector type ---- ??: "user-zh"
Sql??kafka????????????????
??sql??kafka??1.11??datastream Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])
Re: flink1.11 sql kafka 抽取事件时间
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的? 18500348...@163.com <18500348...@163.com> 于2020年7月15日周三 下午3:04写道: > 大家好! > > 使用flink1.11 sql接入kafka ,format为csv > 从eventTime字段中抽取事件时间 > rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd > HH:mm:ss')) > eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', > 那么eventTime会被设置为null,此时会报一个异常: > Caused by: java.lang.RuntimeException: RowTime field should not be null, > please convert it to a non-null long value. > > 有没有什么好的方式可以解决 > > > 祝好! > > > > 18500348...@163.com > -- Best, Benchao Li
flink1.11 sql kafka 抽取事件时间
大家好! 使用flink1.11 sql接入kafka ,format为csv 从eventTime字段中抽取事件时间 rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd HH:mm:ss')) eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 那么eventTime会被设置为null,此时会报一个异常: Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value. 有没有什么好的方式可以解决 祝好! 18500348...@163.com
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
就是现在Flink json已经有了对于VARBINARY类型的处理逻辑,就是string和byte[]互转,然后还需要有base64编码。 但是我们是想让对于VARBINARY的处理逻辑变成另外一种形式,就是把JsonNode直接toString,获取这个json子树的 字符串表示,然后再转成byte[]来作为这个字段。输出的时候,也会直接通过这个byte[]数据来构造一个JsonNode树, 然后放到对应的位置上。也就做到了一个json节点原封不动的保留到了输出里面,不管它是一个什么类型的json节点。 Peihui He 于2020年7月15日周三 上午9:59写道: > Hi BenChao, > > 请问第2个解决思路中 额外加一个选项是指什么呢? > > Best wishes. > > Benchao Li 于2020年7月10日周五 下午1:54写道: > > > Hi Peihui, > > > > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 > > > > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, > > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 > > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 > > 从结果上来看,*还不能完全做到原封不动的输出到下游*。 > > > > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: > > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 > > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 > > 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 > > 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 > > > > Jark Wu 于2020年7月10日周五 下午12:22写道: > > > > > 社区有个 issue 正在解决这个问题,可以关注一下 > > > https://issues.apache.org/jira/browse/FLINK-18002 > > > > > > Best, > > > Jark > > > > > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > > > > > Hi, Peihui > > > > > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > > > 的解析的底层实现 > > > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > > > 然后query里用UDTF处理。 > > > > > > > > > > > > 祝好 > > > > Leonard Xu > > > > > > > > > > > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > > > > > Hello, > > > > > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > > > > > Best wishes. > > > > > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > > > > > >> Hello, > > > > >> > > > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > > > >> > > > > >> > > > > >> Best wishes. > > > > >> > > > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > > > >> > > > > >>> Hi Peihui, > > > > >>> > > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > > > >>> > > > > >>> { > > > > >>>"a":"b", > > > > >>>"c":{ > > > > >>>"d":"e", > > > > >>>"g":"f" > > > > >>>} > > > > >>> }, > > > > >>> > > > > >>> 那么在 kafka table source 可以使用 row 来定义: > > > > >>> > > > > >>> create table xxx ( > > > > >>> a varchar, > > > > >>> c row > > > > >>> ) > > > > >>> > > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > > > >>> > > > > >>> Best, > > > > >>> LakeShen > > > > >>> > > > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > > > >>> > > > > Hello: > > > > > > > > 在用flink > > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > > > > > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > > > > > > > > > > Best wishes. > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hi BenChao, 请问第2个解决思路中 额外加一个选项是指什么呢? Best wishes. Benchao Li 于2020年7月10日周五 下午1:54写道: > Hi Peihui, > > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 > > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 > 从结果上来看,*还不能完全做到原封不动的输出到下游*。 > > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 > 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 > 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 > > Jark Wu 于2020年7月10日周五 下午12:22写道: > > > 社区有个 issue 正在解决这个问题,可以关注一下 > > https://issues.apache.org/jira/browse/FLINK-18002 > > > > Best, > > Jark > > > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > > > Hi, Peihui > > > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > > 的解析的底层实现 > > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > > 然后query里用UDTF处理。 > > > > > > > > > 祝好 > > > Leonard Xu > > > > > > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > > > Hello, > > > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > > > Best wishes. > > > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > > > >> Hello, > > > >> > > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > > >> > > > >> > > > >> Best wishes. > > > >> > > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > > >> > > > >>> Hi Peihui, > > > >>> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > > >>> > > > >>> { > > > >>>"a":"b", > > > >>>"c":{ > > > >>>"d":"e", > > > >>>"g":"f" > > > >>>} > > > >>> }, > > > >>> > > > >>> 那么在 kafka table source 可以使用 row 来定义: > > > >>> > > > >>> create table xxx ( > > > >>> a varchar, > > > >>> c row > > > >>> ) > > > >>> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > > >>> > > > >>> Best, > > > >>> LakeShen > > > >>> > > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > > >>> > > > Hello: > > > > > > 在用flink > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > > > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > > > > > > > Best wishes. > > > > > > >>> > > > >> > > > > > > > > > > > -- > > Best, > Benchao Li >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
感谢,已经按后面一种方式做了珞 Leonard Xu 于2020年7月10日周五 上午11:13写道: > Hi, Peihui > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 > 就是按照json的标准格式解析(jackson)的,没法将一个 > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > 一种做法是定义复杂的jsonObject对应的ROW > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > 然后query里用UDTF处理。 > > > 祝好 > Leonard Xu > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > Hello, > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > Best wishes. > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > >> Hello, > >> > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > >> > >> > >> Best wishes. > >> > >> LakeShen 于2020年7月10日周五 上午10:03写道: > >> > >>> Hi Peihui, > >>> > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > >>> > >>> { > >>>"a":"b", > >>>"c":{ > >>>"d":"e", > >>>"g":"f" > >>>} > >>> }, > >>> > >>> 那么在 kafka table source 可以使用 row 来定义: > >>> > >>> create table xxx ( > >>> a varchar, > >>> c row > >>> ) > >>> > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > >>> > >>> Best, > >>> LakeShen > >>> > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > >>> > Hello: > > 在用flink > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > Best wishes. > > >>> > >> > >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
hi,大家好 对于json schema的问题,我想问一个其他的问题, 比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink sql。 其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的, 某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。 这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢谢。 Benchao Li 于2020年7月10日周五 下午1:54写道: > Hi Peihui, > > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 > > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 > 从结果上来看,*还不能完全做到原封不动的输出到下游*。 > > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 > 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 > 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 > > Jark Wu 于2020年7月10日周五 下午12:22写道: > > > 社区有个 issue 正在解决这个问题,可以关注一下 > > https://issues.apache.org/jira/browse/FLINK-18002 > > > > Best, > > Jark > > > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > > > Hi, Peihui > > > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > > 的解析的底层实现 > > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > > 然后query里用UDTF处理。 > > > > > > > > > 祝好 > > > Leonard Xu > > > > > > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > > > Hello, > > > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > > > Best wishes. > > > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > > > >> Hello, > > > >> > > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > > >> > > > >> > > > >> Best wishes. > > > >> > > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > > >> > > > >>> Hi Peihui, > > > >>> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > > >>> > > > >>> { > > > >>>"a":"b", > > > >>>"c":{ > > > >>>"d":"e", > > > >>>"g":"f" > > > >>>} > > > >>> }, > > > >>> > > > >>> 那么在 kafka table source 可以使用 row 来定义: > > > >>> > > > >>> create table xxx ( > > > >>> a varchar, > > > >>> c row > > > >>> ) > > > >>> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > > >>> > > > >>> Best, > > > >>> LakeShen > > > >>> > > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > > >>> > > > Hello: > > > > > > 在用flink > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > > > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > > > > > > > Best wishes. > > > > > > >>> > > > >> > > > > > > > > > > > -- > > Best, > Benchao Li >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hi Peihui, 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 从结果上来看,*还不能完全做到原封不动的输出到下游*。 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 Jark Wu 于2020年7月10日周五 下午12:22写道: > 社区有个 issue 正在解决这个问题,可以关注一下 > https://issues.apache.org/jira/browse/FLINK-18002 > > Best, > Jark > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > Hi, Peihui > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > 的解析的底层实现 > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > 一种做法是定义复杂的jsonObject对应的ROW > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > 然后query里用UDTF处理。 > > > > > > 祝好 > > Leonard Xu > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > Hello, > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > Best wishes. > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > >> Hello, > > >> > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > >> > > >> > > >> Best wishes. > > >> > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > >> > > >>> Hi Peihui, > > >>> > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > >>> > > >>> { > > >>>"a":"b", > > >>>"c":{ > > >>>"d":"e", > > >>>"g":"f" > > >>>} > > >>> }, > > >>> > > >>> 那么在 kafka table source 可以使用 row 来定义: > > >>> > > >>> create table xxx ( > > >>> a varchar, > > >>> c row > > >>> ) > > >>> > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > >>> > > >>> Best, > > >>> LakeShen > > >>> > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > >>> > > Hello: > > > > 在用flink > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > > > > Best wishes. > > > > >>> > > >> > > > > > -- Best, Benchao Li
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
社区有个 issue 正在解决这个问题,可以关注一下 https://issues.apache.org/jira/browse/FLINK-18002 Best, Jark On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > Hi, Peihui > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 > 就是按照json的标准格式解析(jackson)的,没法将一个 > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > 一种做法是定义复杂的jsonObject对应的ROW > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > 然后query里用UDTF处理。 > > > 祝好 > Leonard Xu > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > Hello, > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > Best wishes. > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > >> Hello, > >> > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > >> > >> > >> Best wishes. > >> > >> LakeShen 于2020年7月10日周五 上午10:03写道: > >> > >>> Hi Peihui, > >>> > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > >>> > >>> { > >>>"a":"b", > >>>"c":{ > >>>"d":"e", > >>>"g":"f" > >>>} > >>> }, > >>> > >>> 那么在 kafka table source 可以使用 row 来定义: > >>> > >>> create table xxx ( > >>> a varchar, > >>> c row > >>> ) > >>> > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > >>> > >>> Best, > >>> LakeShen > >>> > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > >>> > Hello: > > 在用flink > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > Best wishes. > > >>> > >> > >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hi, Peihui 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 就是按照json的标准格式解析(jackson)的,没法将一个 jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 一种做法是定义复杂的jsonObject对应的ROW 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, 然后query里用UDTF处理。 祝好 Leonard Xu > 在 2020年7月10日,10:16,Peihui He 写道: > > Hello, > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > Best wishes. > > Peihui He 于2020年7月10日周五 上午10:12写道: > >> Hello, >> >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 >> >> >> Best wishes. >> >> LakeShen 于2020年7月10日周五 上午10:03写道: >> >>> Hi Peihui, >>> >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: >>> >>> { >>>"a":"b", >>>"c":{ >>>"d":"e", >>>"g":"f" >>>} >>> }, >>> >>> 那么在 kafka table source 可以使用 row 来定义: >>> >>> create table xxx ( >>> a varchar, >>> c row >>> ) >>> >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 >>> >>> Best, >>> LakeShen >>> >>> Peihui He 于2020年7月10日周五 上午9:12写道: >>> Hello: 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 Best wishes. >>> >>
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hi Peihui, 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: { "a":"b", "c":{ "d":"e", "g":"f" } }, 那么在 kafka table source 可以使用 row 来定义: create table xxx ( a varchar, c row ) 如果 还存在嵌套,可以继续再使用 Row 来定义。 Best, LakeShen Peihui He 于2020年7月10日周五 上午9:12写道: > Hello: > > 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > Best wishes. >
flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hello: 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 Best wishes.
?????? flink sql ??????kafka??????????????????????key??
---- ??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. <https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.; ?? 2020??7??717:01??op <520075...@qq.com ?? hi?? nbsp; flink sql ??kafka??key kafka connectorkey?? nbsp;
flink sql ??????kafka??????????????????????key??
hi?? flink sql ??kafka??key kafka connectorkey??
Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi LakeShen, I'm sorry there is no such configuration for json format currently. I think it makes sense to add such configuration like 'format.ignore-parse-errors' in csv format. I created FLINK-15396[1] to track this. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-15396 On Thu, 26 Dec 2019 at 11:44, LakeShen wrote: > Hi community,when I write the flink ddl sql like this: > > CREATE TABLE kafka_src ( > id varchar, > a varchar, > b TIMESTAMP, > c TIMESTAMP > ) > with ( >... > 'format.type' = 'json', > 'format.property-version' = '1', > 'format.derive-schema' = 'true', > 'update-mode' = 'append' > ); > > If the message is not the json format ,there is a error in the log。 > My question is that how to deal with the message which it not json format? > My thought is that I can catch the exception > in JsonRowDeserializationSchema deserialize() method,is there any > parameters to do this? > Thanks your replay. > >
Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi community,when I write the flink ddl sql like this: CREATE TABLE kafka_src ( id varchar, a varchar, b TIMESTAMP, c TIMESTAMP ) with ( ... 'format.type' = 'json', 'format.property-version' = '1', 'format.derive-schema' = 'true', 'update-mode' = 'append' ); If the message is not the json format ,there is a error in the log。 My question is that how to deal with the message which it not json format? My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this? Thanks your replay.