Hi 社区。
Flink 1.12.1
现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。
insert into table_a select id,udf(a),b,c from table_b;
发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认
定义一个 sourcetable
--
Sent from: http://apache-flink.147419.n8.nabble.com/
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian
guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:
> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >
>
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。
> guaishushu1103@
>
> guaishushu1103@
> 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
>
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
2021-03-04 02:33:25,292 DEBUG org.apache.flink.runtime.rpc.akka.SupervisorActor
[] - Starting FencedAkkaRpcActor with name jobmanager_2.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,304 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
endpoint for
flink sql中如何使用异步io关联维表?官网文档有介绍么?
SQL 也不能这样吧- -
At 2021-03-03 16:43:49, "JackJia" wrote:
>Hi 诸位同仁:
>诸同仁好,flink TableEnvironment.sqlUpdate是不是不支持update 多表关联更新?
>
>
>如下代码:
>bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 "
>+
>" where a.mac=b.mac and extract(epoch from a.usertime)/7200
是不是使用了随机key。
guaishushu1...@163.com 于2021年3月3日周三 下午6:53写道:
> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
>
checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator
KeyedProcess (21/48).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??
??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime
hi Michael,
??
----
??:
"user-zh"
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??
??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime
13 matches
Mail list logo