Tidb??Tidb??TiDBstructured-streaming??
??
----
??:
大家好,公司内部写的自定义RocektMqSource,会偶现位点前移的现象,偶现时间不定,目前找不出原因。Flink 版本
1.4.2,目前是不会从checkPoint恢复,但是会做checkPoint
Source代码如下:
public class RocketMQSource extends
RichParallelSourceFunction implements
CheckpointedFunction,ResultTypeQueryable {
public static final int DELAY_MSG_NOT_FOUND =
??flink??debeziumcanal??kafka,
canalafter
??debeziumflink-cdc??debezium??record
为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表
我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟
所以应该是维表JOIN的问题
现在连的数据库是TiDB,连接串属性为
useUnicode=true=UTF-8=Asia/Shanghai=true
--
Sent from: http://apache-flink.147419.n8.nabble.com/
理论上只要实现了LookupTableSource。你在 TableFunction 里面怎么重写 eval 都可以,不管你是要读取哪里的数据怎么读。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?
CREATE TABLE
您好,我在向yarn 集群提交flink任务时遇到了一些问题,希望能帮忙回答一下
我布署了一个三个节点hadoop集群,两个工作节点为4c24G,yarn-site中配置了8个vcore,可用内存为20G,总共是16vcore
40G的资源,现在我向yarn提交了两个任务,分别分配了3vcore,6G内存,共消耗6vcore,12G内存,从hadoop的web
ui上也能反映这一点,如下图:
但是当我提交第三个任务时,却无法提交成功,没有明显的报错日志,可是整个集群的资源明显是充足的,所以不知道问题是出现在哪里,还请多多指教
附1(控制台输出):
The program
发自我的iPhone
> 在 2021年4月21日,19:58,Peihui He 写道:
>
> fetch.min.bytes
> fetch.wait.max.ms
> 还可以用着两个参数控制下的
>
> 熊云昆 于2021年4月21日周三 下午7:10写道:
>
>> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>>
Hi Leonard,
好的謝謝你的回覆
Regards,
*Hau ChongAih*
On Wed, Apr 21, 2021 at 7:27 PM Leonard Xu wrote:
> Hi, ChongAih
>
> 你可以参考 JdbcDynamicTableSource [1] 这个 table source 实现了 LookupTableSource
> 接口,你需要写一个类似 JdbcRowDataLookupFunction 即可
> 的函数即可。
>
> 祝好,
> Leonard
> [1]
>
你好,请问Flink 是否支持非blink planer下的 listagg,有计划支持吗。现阶段如果想使用listagg,请问有什么好的方法支持吗
POJO??-??
public class User {
private String name;
private String age;
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
'}';
}
public User(String name, String age) {
this.name =
fetch.min.bytes
fetch.wait.max.ms
还可以用着两个参数控制下的
熊云昆 于2021年4月21日周三 下午7:10写道:
> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>
>
> | |
> 熊云昆
> |
> |
> 邮箱:xiongyun...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年04月20日 18:19,李一飞 写道:
> flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
> 最好分流、批场景回答一下,谢谢!
Hi, ChongAih
你可以参考 JdbcDynamicTableSource [1] 这个 table source 实现了 LookupTableSource
接口,你需要写一个类似 JdbcRowDataLookupFunction 即可
的函数即可。
祝好,
Leonard
[1]
有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|
签名由 网易邮箱大师 定制
在2021年04月20日 18:19,李一飞 写道:
flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
最好分流、批场景回答一下,谢谢!
我看了源码,即使改换成debezium json格式输出,也得不到原本debezium
json数据,因为输出字段只有有限的3个,没有关键的库表信息。而且看了其他几个cdc格式,都有类似的问题
想知道是为什么?追踪到上游debezium
emitRecords方法,参数record就只有rowdata和rowkind信息,没有table和database
DebeziumJsonSerializationSchema.java
private static RowType createJsonRowType(DataType databaseSchema) {
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi all,
flink在使用temporal join只支持look up table source。我在做單元測試的時候, 下載了hive
表裡面的數據,嘗試了用filesystem註冊temporal table。可是後來發現file system不支持lookup。查詢了文檔(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/legacySourceSinks.html),用戶可以自定義look
up table source。可是我找不到類似用csv設置look up table
大家好,
flink sql消费kafka join普通表是会性能爬坡吗?
背景是flink 1.12.0 使用flink sql在yarn per-job发布,消费kafka topic=trades,然后join 数据库里的维表
shop_meta
现在发现每次重启flink sql job,或上游突然增加大量写入时,flink sql的消费速度总是慢慢增加上来,这样就会造成上游积压,等flink
sql消费速度上来之后才能慢慢把积压消费完毕。
更多的信息:
??flink-cdc,??kafka,format canal-json
----
??:
"user-zh"
Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
casel.chen 于2021年4月20日周二 下午6:18写道:
> 目标是用flink作业实现类似canal server的功能
>
>
> CREATE TABLE `binlog_table` (
>
> `id` INT,
>
>
21 matches
Mail list logo