Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 jindy_liu
flink sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。 恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql cdc流的并行度为1,其它算子并行度都为60) taskmgr的flink-conf主要参数为,其它为默认: taskmanager.numberOfTaskSlots: 10

Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Appleyuchi
谢谢两位大佬的回复。 还剩下两个小问题, ①请问输出结果里面的"+"是什么意思,不应该是2017吗? 1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4) 8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6) 4>

Re: Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图, 你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。 那么最后的 sum(amount) 结果自然是 200。 如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka connector,不定义 pk 即可,也就是当成普通 log 处理了。 关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回

Re:Re: Re: retract stream UDAF使用问题

2020-12-09 文章 bulterman
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗?

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 bradyMk
弱弱的问一句,相关的配置项是直接在flink-conf.xml文件里配置就可以嘛? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code 下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 Best, Jark On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote: > // kafka table > tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > >

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi, Leonard 好的,我将会提一个PR来修复这个issue Leonard Xu 于2020年12月10日周四 下午12:10写道: > 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, > 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为 > OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, > 也可以说是OutputFormat不会参与cp,

关于Task之间数据rebalance是否可能根据反压情况调整数据流

2020-12-09 文章 赵一旦
比如,A和B的2个task之间是rebalance,假设A的某个并行实例被反压(可能比较少见,毕竟是rebalance,但也是可能的,比如碰巧等),能否在rebalance时候根据反压调整转发的数据流呢。

Re:Re: retract stream UDAF使用问题

2020-12-09 文章 bulterman
// kafka table tableEnv.execuetSql("CREATE TABLE market_stock(\n" + "Code STRING,\n" + "Amount BIGINT,\n" + .. "PRIMARY KEY (Code) NOT ENFORCED\n" + ") WITH (\n" + "

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? 祝好, Leonard > 在 2020年12月10日,11:22,jie

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Leonard Xu
Hi, 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark, 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1] 给出文档中省略的watermark生成部分code: // 老版本 //Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor()

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 Yun Tang
Hi Operator state 本身也并不是线程安全的,只是往常的读写都是持有checkpoint锁的task主线程或者checkpoint异步线程,所以才能做到数据安全,SourceFunction文档里面也强调需要在获得checkpointLock的前提下更新state。 至于如何开启Flink中的RocksDB的native metrics,之前给你的文档链接里面有描述,相关的配置项设为true即可。 祝好 唐云 From: bradyMk Sent: Thursday, December 10,

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Yun Tang
Hi FsStateBackend 在性能上是比 RocksDBStateBackend 好,这个是符合预期的。不过想要获得高性能的话,需要更多的jvm堆上内存,但是大内存场景下的GC会很痛苦,所以并不是说加内存之后,性能可以线性增长。 现在还有一个问题是你的状态有多大呢,可以去有状态的节点上看DB的大小(通过增量checkpoint的checkpointed data size也可以间接推出),看CPU使用情况,看磁盘的iostat,来找到具体的瓶颈在哪里。 祝好 唐云 From: Jark Wu Sent:

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 bradyMk
谢谢大佬解答~最近一直在看相关的知识,我还有两个问题在网上没有找到解答,想咨询一下: 1、如果我不用keyed State,而改用Operator State,Operator State是所有线程操作一个state么?如果这样,那Operator State是线程安全的么? 2、您之前说的配置 RocksDB 的native metrics,我在官网看到这些指标都是禁用的,那该如何开启呢?我在代码里貌似没有找到相关方法开启各类RocksDB 的native metrics; - Best Wishes -- Sent from:

Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
反压的话,你可以重点看下你使用的是什么state backend, 如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题; 如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。 你可以按照找个思路排查一下。 song wang 于2020年12月10日周四 上午11:38写道: > hi,Benchao, > 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! > > Benchao Li 于2020年12月10日周四 上午11:28写道: >

Re: interval join 时checkpoint失败

2020-12-09 文章 song wang
hi,Benchao, 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! Benchao Li 于2020年12月10日周四 上午11:28写道: > 你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, > 看起来是有可能因为反压导致的Checkpoint超时失败。 > > song wang 于2020年12月10日周四 上午10:59写道: > > > 各位好, > > 两个流进行interval join,时间窗口是 >

Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
可以发下代码吗? On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: > 上游是upsert-kafka connector 创建的table, > 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > (为了测试方便,table里只有同一个PK的数据)

Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, 看起来是有可能因为反压导致的Checkpoint超时失败。 song wang 于2020年12月10日周四 上午10:59写道: > 各位好, > 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager > log中的报错信息为: > > 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator

Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 文章 Jacob
*Thank you for your reply!* 日志以及pom文件如下 Container: container_1603495749855_55197_02_01 on hadoop01 = LogType:jobmanager.err Log Upload Time:Wed Dec 09 17:03:38 -0800 2020 LogLength:802 Log Contents: SLF4J:

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi,Jark 好的,我会就此创建一个issue Jark Wu 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > > > Hi, > >

retract stream UDAF使用问题

2020-12-09 文章 bulterman
上游是upsert-kafka connector 创建的table, 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 (为了测试方便,table里只有同一个PK的数据)

or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.

2020-12-09 文章 ????????
The last packet successfully received from the server was 60,919,851 milliseconds ago. The last packet sent successfully to the server was 60,919,876 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Jark Wu
Hi Jie, 看起来确实是个问题。 sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 可以帮忙创建个 issue 么? Best, Jark On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > Hi, >是的,感觉你是对的。 > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 >

Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 文章 Jark Wu
postgres-cdc 的表只支持读,不支持写。 On Wed, 9 Dec 2020 at 22:49, zhisheng wrote: > sql client 也得重启 > > 王敏超 于2020年12月9日周三 下午4:49写道: > > > 在使用standalone模式,并启动sql > > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, > > 并且重启过集群。同样方式使用mysql cdc是可以的。 > > > > Caused by:

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Jark Wu
链接错了。重发下。 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html 2) 你的代码好像也不对。 L45: Table

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Jark Wu
1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性: https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders" 表。这一行应该执行不成功把。 Best, Jark On Wed, 9 Dec 2020 at 15:44,

????????????????

2020-12-09 文章 ????????

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-09 文章 Jark Wu
嗯 1.12.0 这两天就会发布。 On Wed, 9 Dec 2020 at 14:45, xiao cai wrote: > Hi Jark > sorry,是1.12.0, 我打错了 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Wednesday, Dec 9, 2020 14:40 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai,

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Jark Wu
关于 rocksdb 的性能调优, @Yun Tang 会更清楚。 On Thu, 10 Dec 2020 at 11:04, Jark Wu wrote: > 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 > > 你可以参考下这几篇文章尝试调优下 rocksdb: > > https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA > https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw >

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Jark Wu
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 你可以参考下这几篇文章尝试调优下 rocksdb: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg Best,

Re: flink sql 1.11 kafka cdc与holo sink

2020-12-09 文章 Jark Wu
1. 目前不支持。 已有 issue 跟进支持 https://issues.apache.org/jira/browse/FLINK-20385 2. 配上 canal-json.table.include = 't1' 来过滤表。暂不支持正则过滤。 3. 会 Best, Jark On Wed, 9 Dec 2020 at 11:33, 于洋 <1045860...@qq.com> wrote: > flink sql 1.11 创建kafka source 表 ,kafka数据是canal采集的mysql 信息,'format' = > 'canal-json', 问题是

interval join 时checkpoint失败

2020-12-09 文章 song wang
各位好, 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager log中的报错信息为: 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator - Checkpoint 143 of job ee4114a1c5413bd02a68b1165090578e expired before completing.

Re: flink11 SQL 如何支持双引号字符串

2020-12-09 文章 Jark Wu
跟这个 issue 没有关系。 这个听起来更像是 hive query 兼容的需求? 可以关注下 FLIP-152: Hive Query Syntax Compatibility https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility Best, Jark On Wed, 9 Dec 2020 at 11:13, zhisheng wrote: > 是跟这个 Issue

flink+ druid

2020-12-09 文章 ????????
The last packet successfully received from the server was 60,919,851 milliseconds ago. The last packet sent successfully to the server was 60,919,876 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection

Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 文章 李轲
谢谢 发自我的iPhone > 在 2020年12月10日,10:49,Jark Wu 写道: > > 看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client > 界面上,而不会插入到 postgres 中。 > > 你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式 > 还不支持这个类型。 > > 这个问题的解决可以关注下这个

Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 文章 Jark Wu
看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client 界面上,而不会插入到 postgres 中。 你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式 还不支持这个类型。 这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948 Best, Jark On Tue, 8 Dec 2020 at 19:32, 李轲

Re: flink 1.12如何使用RateLimiter

2020-12-09 文章 Danny Chan
您好 请问是什么场景呢 ?限速的目的是什么 ? 18757141558 <18757141...@163.com> 于2020年12月9日周三 下午6:49写道: > 在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter > kafka相关的类中没有找到这些配置 > 请问如何在api中使用RateLimiter(不修改源码方式)

Re: 求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-09 文章 Jark Wu
1. 提示“找不到hbase包” 具体的异常栈是什么呢? 2. 看你的步骤中也没有加 flink hbase connector jar 到 lib 下,这会导致找不到 hbase table factory 3. flink 1.11 版本的时候还没有提供 hbase 2.x connector jar 4. flink 1.12 版本支持了 hbase 2.x,理论上也兼容 flink 1.11 集群。 所以你可以试下 download

Re: [flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

2020-12-09 文章 Jark Wu
> tabEnv.createTemporaryView("test_table", result, 我看你这不是注册进去了么? 有报什么错么? 最后提交作业执行记得调用 StreamExecutionEnvironment.execute() Best, Jark On Tue, 8 Dec 2020 at 14:54, Tianwang Li wrote: > Flink版本:1.10.2 > > 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。 > > 本地测试的结果是一直重复输出数据。 > >

Re: flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道: > https://issues.apache.org/jira/browse/FLINK-19358 > > > > > > > 在2020年12月10日 09:32,Jeff > > > > 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! > > > > > > > > > > > > > > > 在 2020-03-24

Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 文章 Xintong Song
jobmanager 的日志方便发下吗? 另外,可以看下 yarn 是否分配了 taskmanager 的 container,如果有的话通过 yarn 获取以下 taskmanager 的日志。 Thank you~ Xintong Song On Thu, Dec 10, 2020 at 9:55 AM Jacob <17691150...@163.com> wrote: > < > http://apache-flink.147419.n8.nabble.com/file/t1162/Screenshot_2020-12-09_153858.png> > > > >

回复:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 在2020年12月10日 09:32,Jeff

Flink 1.11.2 on yarn 提交job失败

2020-12-09 文章 Jacob
*从flink1.7.2升级到1.11.2,job无法提交* 代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ): Container: container_1603495749855_55197_02_01 on hadoop01 = LogType:jobmanager.err Log Upload

Flink 1.11.2 on yarn 提交job失败

2020-12-09 文章 Jacob
*从flink1.7.2升级到1.11.2,job无法提交*代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ):Container: container_1603495749855_55197_02_01 on hadoop01=LogType:jobmanager.errLog Upload Time:Wed

Re: Flink 1.11.2 on yarn报错

2020-12-09 文章 Jacob
该问题已经fix,确实是java版本问题! -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 文章 Jacob
启动命令: ./bin/flink run-application -t yarn-application -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name="Test Job" -c com.jacob.Main /opt/app/test.jar

Re:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jeff
这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! 在 2020-03-24 07:14:07,"Peihui He" 写道: >大家好,我在用flink >1.9.2 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了 >这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。 >不知道这是什么原因呢?

Re:flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 hailongwang
Hi, 是的,感觉你是对的。 `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState 时候调用format.flush。 WDYT @Jark @ Leonard Best, Hailong 在 2020-12-09 17:13:14,"jie mei" 写道: >Hi, Community > >JDBC

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time - INTERVAL 'x' HOUR 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, 能够反推出来数据的 currentMaxTimestamp currentMaxTimestamp = watermark + maxOutOfOrderness

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
感谢 一旦 和 Benchao 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 val result = bsTableEnv.sqlQuery(""" SELECT * FROM ( SELECT t1.`table`, t1.`database`, t1.transaction_type,

Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 文章 zhisheng
sql client 也得重启 王敏超 于2020年12月9日周三 下午4:49写道: > 在使用standalone模式,并启动sql > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, > 并且重启过集群。同样方式使用mysql cdc是可以的。 > > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'postgres-cdc' that

create table语句从kafka中读取数据时,创建的表的数据保存多久?

2020-12-09 文章 邮件帮助中心

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 Yun Tang
State本身不是线程安全的 [1],但是目前对于state的更新都是在task主线程内,而task主线程是线程安全的。除非通过一些特别的方式,例如异步的metrics线程用户逻辑下访问state导致的state写更新副作用,一般是不会出现写错的问题。 [1] https://issues.apache.org/jira/browse/FLINK-13072 祝好 唐云 From: bradyMk Sent: Tuesday, December 8, 2020 17:59 To:

flink 1.12如何使用RateLimiter

2020-12-09 文章 18757141558
在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter kafka相关的类中没有找到这些配置 请问如何在api中使用RateLimiter(不修改源码方式)

flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi, Community JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 我的问题是:是否有办法强制刷新buffer中的数据入库? @Public public interface OutputFormat extends Serializable { /** * Configures this output

flink1.12 docker 镜像啥时候有

2020-12-09 文章 jiangjiguang719
请问啥时候 在docker hub中可以看到1.12版本的镜像?

Re:Re:Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-09 文章 破极
看了下邮箱列表中提到的方式,目前没打算升级flink,采用自定义format方式解决了这个问题。感谢各位大佬。 在 2020-12-09 15:27:09,"破极" 写道: >刚才搜到了,谢谢 > > > > > > > > > > > > > > > > > >在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道: >>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259

使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 文章 王敏超
在使用standalone模式,并启动sql cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, 并且重启过集群。同样方式使用mysql cdc是可以的。 Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements