----
??:
"user-zh"
:2021??4??22??(??) ????10:50
>??:"user-zh"
>:Re: flink sqlkafka join??
>
>
>
>??SQLparse json??join
>SQL??join70s=3.8k???
Tidb??Tidb??TiDBstructured-streaming??
??
----
??:
> 看了下,是1.12才开始支持么,1.11是不行的嘛?
是的,1.11不支持,文档也是有版本的,如果对应版本的文档里没有该功能介绍,那就是不支持的。
看了下,是1.12才开始支持么,1.11是不行的嘛?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
: gimlee
发送时间: 2021-01-21 11:20
收件人: user-zh
主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
如题,需要获取到kafka的partition、offset进行处理
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如题,需要获取到kafka的partition、offset进行处理
--
Sent from: http://apache-flink.147419.n8.nabble.com/
支持的,参考 code https://github.com/apache/flink/pull/12919/commits
陈帅 于2020年11月3日周二 上午8:44写道:
> flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗?
> 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢!
>
?? 1.10??connector type
----
??:
"user-zh"
??sql??kafka??1.11??datastream
Exception in thread "main" org.apache.flink.table.api.TableException: Table
sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support
consuming update cha
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?
18500348...@163.com <18500348...@163.com> 于2020年7月15日周三 下午3:04写道:
> 大家好!
>
> 使用flink1.11 sql接入kafka ,format为csv
> 从eventTime字段中抽取事件时间
> rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd
> HH:mm:ss'))
>
大家好!
使用flink1.11 sql接入kafka ,format为csv
从eventTime字段中抽取事件时间
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd HH:mm:ss'))
eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true',
那么eventTime会被设置为null,此时会报一个异常:
Caused by: java.lang.RuntimeException: RowTime field
就是现在Flink json已经有了对于VARBINARY类型的处理逻辑,就是string和byte[]互转,然后还需要有base64编码。
但是我们是想让对于VARBINARY的处理逻辑变成另外一种形式,就是把JsonNode直接toString,获取这个json子树的
字符串表示,然后再转成byte[]来作为这个字段。输出的时候,也会直接通过这个byte[]数据来构造一个JsonNode树,
然后放到对应的位置上。也就做到了一个json节点原封不动的保留到了输出里面,不管它是一个什么类型的json节点。
Peihui He 于2020年7月15日周三 上午9:59写道:
>
Hi BenChao,
请问第2个解决思路中 额外加一个选项是指什么呢?
Best wishes.
Benchao Li 于2020年7月10日周五 下午1:54写道:
> Hi Peihui,
>
> 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
>
> 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json
感谢,已经按后面一种方式做了珞
Leonard Xu 于2020年7月10日周五 上午11:13写道:
> Hi, Peihui
>
> 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
> 就是按照json的标准格式解析(jackson)的,没法将一个
> jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
hi,大家好
对于json schema的问题,我想问一个其他的问题,
比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink
sql。
其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的,
某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。
Hi Peihui,
正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
从结果上来看,*还不能完全做到原封不动的输出到下游*。
不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
1. 用RAW类型,这个需要json
社区有个 issue 正在解决这个问题,可以关注一下
https://issues.apache.org/jira/browse/FLINK-18002
Best,
Jark
On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote:
> Hi, Peihui
>
> 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
> 就是按照json的标准格式解析(jackson)的,没法将一个
>
Hi, Peihui
我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
就是按照json的标准格式解析(jackson)的,没法将一个
jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
一种做法是定义复杂的jsonObject对应的ROW
Hi Peihui,
如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
{
"a":"b",
"c":{
"d":"e",
"g":"f"
}
},
那么在 kafka table source 可以使用 row 来定义:
create table xxx (
a varchar,
c row
)
如果 还存在嵌套,可以继续再使用 Row 来定义。
Best,
LakeShen
Peihui He 于2020年7月10日周五 上午9:12写道:
> Hello:
>
>
Hello:
在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
Best wishes.
tebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.;
?? 2020??7??717:01??op <520075...@qq.com ??
hi??
nbsp; flink sql ??kafka??key
kafka connectorkey??
nbsp;
hi??
flink sql ??kafka??key
kafka connectorkey??
Hi LakeShen,
I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-15396
On Thu,
Hi community,when I write the flink ddl sql like this:
CREATE TABLE kafka_src (
id varchar,
a varchar,
b TIMESTAMP,
c TIMESTAMP
)
with (
...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);
If the
26 matches
Mail list logo