Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread jindy_liu
flink sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。 恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql cdc流的并行度为1,其它算子并行度都为60) taskmgr的flink-conf主要参数为,其它为默认: taskmanager.numberOfTaskSlots: 10

Re: What happens when all input partitions become idle

2020-12-09 Thread Dongwon Kim
Hi Benchao, Thanks for the input. The code is self-explanatory. Best, Dongwon On Thu, Dec 10, 2020 at 12:20 PM Benchao Li wrote: > Hi Dongwon, > > I think you understand it correctly. > You can find this logic here[1] > > [1] >

Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-09 Thread Dan Hill
One of the Exception instances finally reported a stacktrace. I'm not sure why it's so infrequent. java.lang.NullPointerException: null at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] at

Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Appleyuchi
谢谢两位大佬的回复。 还剩下两个小问题, ①请问输出结果里面的"+"是什么意思,不应该是2017吗? 1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4) 8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6) 4>

Re: Re: Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图, 你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。 那么最后的 sum(amount) 结果自然是 200。 如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka connector,不定义 pk 即可,也就是当成普通 log 处理了。 关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回

Re:Re: Re: retract stream UDAF使用问题

2020-12-09 Thread bulterman
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗?

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 Thread bradyMk
弱弱的问一句,相关的配置项是直接在flink-conf.xml文件里配置就可以嘛? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code 下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 Best, Jark On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote: > // kafka table > tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > >

Re: Flink cli Stop command exception

2020-12-09 Thread Yang Wang
Maybe FLINK-16626[1] is related. And it is fixed in 1.10.1 and 1.11. [1]. https://issues.apache.org/jira/browse/FLINK-16626 Best, Yang Yun Tang 于2020年12月10日周四 上午11:06写道: > Hi Suchithra, > > Have you ever checked job manager log to see whether the savepoint is > triggered and why the savepoint

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi, Leonard 好的,我将会提一个PR来修复这个issue Leonard Xu 于2020年12月10日周四 下午12:10写道: > 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, > 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为 > OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, > 也可以说是OutputFormat不会参与cp,

关于Task之间数据rebalance是否可能根据反压情况调整数据流

2020-12-09 Thread 赵一旦
比如,A和B的2个task之间是rebalance,假设A的某个并行实例被反压(可能比较少见,毕竟是rebalance,但也是可能的,比如碰巧等),能否在rebalance时候根据反压调整转发的数据流呢。

Re:Re: retract stream UDAF使用问题

2020-12-09 Thread bulterman
// kafka table tableEnv.execuetSql("CREATE TABLE market_stock(\n" + "Code STRING,\n" + "Amount BIGINT,\n" + .. "PRIMARY KEY (Code) NOT ENFORCED\n" + ") WITH (\n" + "

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? 祝好, Leonard > 在 2020年12月10日,11:22,jie

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Leonard Xu
Hi, 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark, 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1] 给出文档中省略的watermark生成部分code: // 老版本 //Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor()

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 Thread Yun Tang
Hi Operator state 本身也并不是线程安全的,只是往常的读写都是持有checkpoint锁的task主线程或者checkpoint异步线程,所以才能做到数据安全,SourceFunction文档里面也强调需要在获得checkpointLock的前提下更新state。 至于如何开启Flink中的RocksDB的native metrics,之前给你的文档链接里面有描述,相关的配置项设为true即可。 祝好 唐云 From: bradyMk Sent: Thursday, December 10,

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-09 Thread Jark Wu
Could you use 4 scalar functions instead of UDTF and map function? For example; select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), hasWatermelon(fruits) from T; I think this can preserve the primary key. Best, Jark On Thu, 3 Dec 2020 at 15:28, Rex Fenley wrote: > It appears

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread Yun Tang
Hi FsStateBackend 在性能上是比 RocksDBStateBackend 好,这个是符合预期的。不过想要获得高性能的话,需要更多的jvm堆上内存,但是大内存场景下的GC会很痛苦,所以并不是说加内存之后,性能可以线性增长。 现在还有一个问题是你的状态有多大呢,可以去有状态的节点上看DB的大小(通过增量checkpoint的checkpointed data size也可以间接推出),看CPU使用情况,看磁盘的iostat,来找到具体的瓶颈在哪里。 祝好 唐云 From: Jark Wu Sent:

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 Thread bradyMk
谢谢大佬解答~最近一直在看相关的知识,我还有两个问题在网上没有找到解答,想咨询一下: 1、如果我不用keyed State,而改用Operator State,Operator State是所有线程操作一个state么?如果这样,那Operator State是线程安全的么? 2、您之前说的配置 RocksDB 的native metrics,我在官网看到这些指标都是禁用的,那该如何开启呢?我在代码里貌似没有找到相关方法开启各类RocksDB 的native metrics; - Best Wishes -- Sent from:

Re: interval join 时checkpoint失败

2020-12-09 Thread Benchao Li
反压的话,你可以重点看下你使用的是什么state backend, 如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题; 如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。 你可以按照找个思路排查一下。 song wang 于2020年12月10日周四 上午11:38写道: > hi,Benchao, > 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! > > Benchao Li 于2020年12月10日周四 上午11:28写道: >

Re: Problem when restoring from savepoint with missing state & POJO modification

2020-12-09 Thread Yun Tang
Hi Bastien, I think you could refer to WritableSavepoint#write [1] to get all existing state and flat map to remove the state you do not want (could refer to StatePathExtractor[2] ) [1]

Re: interval join 时checkpoint失败

2020-12-09 Thread song wang
hi,Benchao, 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! Benchao Li 于2020年12月10日周四 上午11:28写道: > 你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, > 看起来是有可能因为反压导致的Checkpoint超时失败。 > > song wang 于2020年12月10日周四 上午10:59写道: > > > 各位好, > > 两个流进行interval join,时间窗口是 >

Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
可以发下代码吗? On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: > 上游是upsert-kafka connector 创建的table, > 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > (为了测试方便,table里只有同一个PK的数据)

Re: interval join 时checkpoint失败

2020-12-09 Thread Benchao Li
你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, 看起来是有可能因为反压导致的Checkpoint超时失败。 song wang 于2020年12月10日周四 上午10:59写道: > 各位好, > 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager > log中的报错信息为: > > 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator

Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 Thread Jacob
*Thank you for your reply!* 日志以及pom文件如下 Container: container_1603495749855_55197_02_01 on hadoop01 = LogType:jobmanager.err Log Upload Time:Wed Dec 09 17:03:38 -0800 2020 LogLength:802 Log Contents: SLF4J:

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi,Jark 好的,我会就此创建一个issue Jark Wu 于2020年12月10日周四 上午11:17写道: > Hi Jie, > > 看起来确实是个问题。 > sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 > 可以帮忙创建个 issue 么? > > Best, > Jark > > On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > > > Hi, > >

Re: What happens when all input partitions become idle

2020-12-09 Thread Benchao Li
Hi Dongwon, I think you understand it correctly. You can find this logic here[1] [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java#L108 Dongwon Kim 于2020年12月10日周四 上午12:21写道: > Hi, > >

retract stream UDAF使用问题

2020-12-09 Thread bulterman
上游是upsert-kafka connector 创建的table, 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 (为了测试方便,table里只有同一个PK的数据)

or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.

2020-12-09 Thread ????????
The last packet successfully received from the server was 60,919,851 milliseconds ago. The last packet sent successfully to the server was 60,919,876 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread Jark Wu
Hi Jie, 看起来确实是个问题。 sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。 可以帮忙创建个 issue 么? Best, Jark On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote: > Hi, >是的,感觉你是对的。 > `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 >

Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 Thread Jark Wu
postgres-cdc 的表只支持读,不支持写。 On Wed, 9 Dec 2020 at 22:49, zhisheng wrote: > sql client 也得重启 > > 王敏超 于2020年12月9日周三 下午4:49写道: > > > 在使用standalone模式,并启动sql > > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, > > 并且重启过集群。同样方式使用mysql cdc是可以的。 > > > > Caused by:

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Jark Wu
链接错了。重发下。 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html 2) 你的代码好像也不对。 L45: Table

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Jark Wu
1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性: https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders" 表。这一行应该执行不成功把。 Best, Jark On Wed, 9 Dec 2020 at 15:44,

????????????????

2020-12-09 Thread ????????

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-09 Thread Jark Wu
嗯 1.12.0 这两天就会发布。 On Wed, 9 Dec 2020 at 14:45, xiao cai wrote: > Hi Jark > sorry,是1.12.0, 我打错了 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Wednesday, Dec 9, 2020 14:40 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai,

Re: Flink cli Stop command exception

2020-12-09 Thread Yun Tang
Hi Suchithra, Have you ever checked job manager log to see whether the savepoint is triggered and why the savepoint failed to complete. Best Yun Tang From: V N, Suchithra (Nokia - IN/Bangalore) Sent: Wednesday, December 9, 2020 23:45 To: user@flink.apache.org

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread Jark Wu
关于 rocksdb 的性能调优, @Yun Tang 会更清楚。 On Thu, 10 Dec 2020 at 11:04, Jark Wu wrote: > 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 > > 你可以参考下这几篇文章尝试调优下 rocksdb: > > https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA > https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw >

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread Jark Wu
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。 你可以参考下这几篇文章尝试调优下 rocksdb: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg Best,

Re: flink sql 1.11 kafka cdc与holo sink

2020-12-09 Thread Jark Wu
1. 目前不支持。 已有 issue 跟进支持 https://issues.apache.org/jira/browse/FLINK-20385 2. 配上 canal-json.table.include = 't1' 来过滤表。暂不支持正则过滤。 3. 会 Best, Jark On Wed, 9 Dec 2020 at 11:33, 于洋 <1045860...@qq.com> wrote: > flink sql 1.11 创建kafka source 表 ,kafka数据是canal采集的mysql 信息,'format' = > 'canal-json', 问题是

interval join 时checkpoint失败

2020-12-09 Thread song wang
各位好, 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager log中的报错信息为: 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator - Checkpoint 143 of job ee4114a1c5413bd02a68b1165090578e expired before completing.

Re: flink11 SQL 如何支持双引号字符串

2020-12-09 Thread Jark Wu
跟这个 issue 没有关系。 这个听起来更像是 hive query 兼容的需求? 可以关注下 FLIP-152: Hive Query Syntax Compatibility https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility Best, Jark On Wed, 9 Dec 2020 at 11:13, zhisheng wrote: > 是跟这个 Issue

flink+ druid

2020-12-09 Thread ????????
The last packet successfully received from the server was 60,919,851 milliseconds ago. The last packet sent successfully to the server was 60,919,876 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection

Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 Thread 李轲
谢谢 发自我的iPhone > 在 2020年12月10日,10:49,Jark Wu 写道: > > 看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client > 界面上,而不会插入到 postgres 中。 > > 你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式 > 还不支持这个类型。 > > 这个问题的解决可以关注下这个

Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 Thread Jark Wu
看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client 界面上,而不会插入到 postgres 中。 你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式 还不支持这个类型。 这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948 Best, Jark On Tue, 8 Dec 2020 at 19:32, 李轲

Re: flink 1.12如何使用RateLimiter

2020-12-09 Thread Danny Chan
您好 请问是什么场景呢 ?限速的目的是什么 ? 18757141558 <18757141...@163.com> 于2020年12月9日周三 下午6:49写道: > 在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter > kafka相关的类中没有找到这些配置 > 请问如何在api中使用RateLimiter(不修改源码方式)

Re: 求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-09 Thread Jark Wu
1. 提示“找不到hbase包” 具体的异常栈是什么呢? 2. 看你的步骤中也没有加 flink hbase connector jar 到 lib 下,这会导致找不到 hbase table factory 3. flink 1.11 版本的时候还没有提供 hbase 2.x connector jar 4. flink 1.12 版本支持了 hbase 2.x,理论上也兼容 flink 1.11 集群。 所以你可以试下 download

Re: [flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

2020-12-09 Thread Jark Wu
> tabEnv.createTemporaryView("test_table", result, 我看你这不是注册进去了么? 有报什么错么? 最后提交作业执行记得调用 StreamExecutionEnvironment.execute() Best, Jark On Tue, 8 Dec 2020 at 14:54, Tianwang Li wrote: > Flink版本:1.10.2 > > 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。 > > 本地测试的结果是一直重复输出数据。 > >

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values. But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to

Re: flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 Thread Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道: > https://issues.apache.org/jira/browse/FLINK-19358 > > > > > > > 在2020年12月10日 09:32,Jeff > > > > 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! > > > > > > > > > > > > > > > 在 2020-03-24

Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 Thread Xintong Song
jobmanager 的日志方便发下吗? 另外,可以看下 yarn 是否分配了 taskmanager 的 container,如果有的话通过 yarn 获取以下 taskmanager 的日志。 Thank you~ Xintong Song On Thu, Dec 10, 2020 at 9:55 AM Jacob <17691150...@163.com> wrote: > < > http://apache-flink.147419.n8.nabble.com/file/t1162/Screenshot_2020-12-09_153858.png> > > > >

回复:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 Thread Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 在2020年12月10日 09:32,Jeff

Flink 1.11.2 on yarn 提交job失败

2020-12-09 Thread Jacob
*从flink1.7.2升级到1.11.2,job无法提交* 代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ): Container: container_1603495749855_55197_02_01 on hadoop01 = LogType:jobmanager.err Log Upload

Flink 1.11.2 on yarn 提交job失败

2020-12-09 Thread Jacob
*从flink1.7.2升级到1.11.2,job无法提交*代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ):Container: container_1603495749855_55197_02_01 on hadoop01=LogType:jobmanager.errLog Upload Time:Wed

Re: Flink 1.11.2 on yarn报错

2020-12-09 Thread Jacob
该问题已经fix,确实是java版本问题! -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 Thread Jacob
启动命令: ./bin/flink run-application -t yarn-application -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name="Test Job" -c com.jacob.Main /opt/app/test.jar

Re:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 Thread Jeff
这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! 在 2020-03-24 07:14:07,"Peihui He" 写道: >大家好,我在用flink >1.9.2 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了 >这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。 >不知道这是什么原因呢?

How to debug a Flink Exception that does not have a stack trace?

2020-12-09 Thread Dan Hill
In the Flink dashboard, my job is failing with a NullPointerException but the Exception is not showing a stack trace. I do not see any NullPointerExceptions in any of the flink-jobmanager and flink-taskmanager logs. Is this a normal issue? [image: Screen Shot 2020-12-09 at 4.29.30 PM.png]

Re: Error while connecting with MSSQL server

2020-12-09 Thread aj
Sure thanks Flavio, will check it out On Wed, Dec 9, 2020, 16:20 Flavio Pompermaier wrote: > I issued a PR some time ago at https://github.com/apache/flink/pull/12038 but > Flink committers were busy in refactoring that part..I don't know if it is > still required to have that part into the

Re: Stream job getting Failed

2020-12-09 Thread Arvid Heise
Hi Anuj, SIGTERM with SIGNAL 15 means that it was killed by an external process. Look into the Yarn logs to look for a specific error. Usually, yarn kills a container with exit code 143 when it goes over memory boundaries. This is something the community constantly improves, but may still happen

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Rex Fenley
So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate

How to reduce number of metrics pushed to Prometheus Push Gateway

2020-12-09 Thread Alexander Filipchik
Hi, Is there a way to reduce cardinality (preaggregate) metrics that are emitted to Prom Push gateway? Our metrics infra is struggling to digest per task stats. Any way we can configure it to emit per stage aggregates? Our current config: metrics.scope.tm flink.taskmanager

Re:flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread hailongwang
Hi, 是的,感觉你是对的。 `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而 `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState 时候调用format.flush。 WDYT @Jark @ Leonard Best, Hailong 在 2020-12-09 17:13:14,"jie mei" 写道: >Hi, Community > >JDBC

Stream job getting Failed

2020-12-09 Thread aj
I have a Flink stream job that reads data from Kafka and writes it to S3. This job keeps failing after running for 2-3 days. I am not able to find anything in logs why it's failing. Can somebody help me how to find out the cause of failure? I can only see this in logs :

Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

2020-12-09 Thread abelm
Hello! I have a Scala 2.12 project which registers some tables (that get their data from Kafka in JSON form) to the StreamTableEnvironment via the executeSql command before calling execute on the StreamExecutionEnvironment. Everything behaves as expected until I either try to set

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 Thread macia kk
我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time - INTERVAL 'x' HOUR 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, 能够反推出来数据的 currentMaxTimestamp currentMaxTimestamp = watermark + maxOutOfOrderness

Question about OneInputTransformation

2020-12-09 Thread Hongjian Peng
Hi Flink Community, We use Flink SQL to calculate some metrics. In our SQL, we use window aggregation and we want to trigger the result earlier with different trigger strategies. So we get the window operators in the transformations and set the triggers by reflection. It worked in Flink 1.7.

What happens when all input partitions become idle

2020-12-09 Thread Dongwon Kim
Hi, Let's consider two operators: A (parallelism=2) and B (parallelism=1). B has two input partitions, B_A1 and B_A2, which are connected to A1 and A2 respectively. At some point, - B_A1's watermark : 12 - B_A2's watermark : 10 - B's event-time clock : 10 = min(12, 10) - B has registered a timer

Re: Flink 1.11 avro format question

2020-12-09 Thread Hongjian Peng
Thank you for the help. -- Thanks, Hongjian Peng At 2020-11-30 16:16:48, "Dawid Wysakowicz" wrote: Hi, I managed to backport the change to the 1.11 branch. It should be part of the 1.11.3 release. Best, Dawid On 25/11/2020 16:23, Hongjian Peng wrote: Thanks for Danny and

Flink cli Stop command exception

2020-12-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello, I am running streaming flink job and I was using cancel command with savepoint to cancel the job. From flink 1.10 version stop command should be used instead of cancel command. But I am getting below error sometimes. Please let me know what might be the issue.

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 Thread macia kk
感谢 一旦 和 Benchao 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 val result = bsTableEnv.sqlQuery(""" SELECT * FROM ( SELECT t1.`table`, t1.`database`, t1.transaction_type,

Re: lookup cache clarification

2020-12-09 Thread Marco Villalobos
But when I monitor my database, it only shows one query. It does not show a query every 20 seconds. It seems that since the cache is 10 times larger that the records are kept even though they expired. > On Dec 9, 2020, at 4:05 AM, Danny Chan wrote: > > Yes, you understand it correctly. > >

Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 Thread zhisheng
sql client 也得重启 王敏超 于2020年12月9日周三 下午4:49写道: > 在使用standalone模式,并启动sql > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, > 并且重启过集群。同样方式使用mysql cdc是可以的。 > > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'postgres-cdc' that

Flink 1.12 Ask-Me-Anything Meetup

2020-12-09 Thread Ana Vasiliuk
Hi everyone, The Flink 1.12 AMA is happening today at 10 am Pacific Time/ 1 pm Eastern Time/ 7 pm Central European Time. Tune in directly at https://youtu.be/u8jPgXoNDXA for a discussion on the upcoming release and new features with Aljoscha Krettek, Stephan Ewen, Arvid Heise, Robert Metzger, and

Re: Batch loading into postgres database

2020-12-09 Thread Dawid Wysakowicz
Your approach looks rather good to me. In the version with querying for the JobStatus you must remember that there are such states as e.g. INITIALIZING, which just tells you that the job was submitted. In 1.12 we introduced the TableResult#await method, which is a shortcut over what you did in

Re: TextFile source && KeyedWindow triggers --> Unexpected execution order

2020-12-09 Thread Dawid Wysakowicz
Hi Marta, Do you mean you want to emit results every 5 minutes based on the wall time (processing time)? If so you should use the ContinuousProcessingTimeTrigger instead of ContinuousEventTimeTrigger which will emit results based on the event time. Does that solve your problem? Best, Dawid On

Re: Problem when restoring from savepoint with missing state & POJO modification

2020-12-09 Thread bastien dine
Hello Yun, Thank you very much for your response, that's what I thought, However, it does not seem possible to remove only one state using the state processor API, We use it a lot, and we can only remove all of the operator states, not one specifically, Am I missing something? Best Regards,

Re: Application Mode support on VVP v2.3

2020-12-09 Thread narasimha
Thanks for the information. Are there any plans to implement this? It is supported on other docker images... On Tue, 8 Dec 2020 at 9:36 PM, Fabian Paul wrote: > Hi Narasimha, > > I investigated your problem and it is caused by multiple issues. First vvp > in > general cannot really handle

create table语句从kafka中读取数据时,创建的表的数据保存多久?

2020-12-09 Thread 邮件帮助中心

Re: what's meaning of the "true/false" from "groupy...select"?THANKS

2020-12-09 Thread Danny Chan
The "true" means the message is an insert/update after, the "false" means the message is a retraction (for the old record that needs to be modified). Appleyuchi 于2020年12月9日周三 下午12:20写道: > > The complete code is: > https://paste.ubuntu.com/p/hpWB87kT6P/ > > The result is: > 2> (true,1,diaper,4)

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
Hi, Rex Fenley ~ If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the

Re: lookup cache clarification

2020-12-09 Thread Danny Chan
Yes, you understand it correctly. Marco Villalobos 于2020年12月9日周三 上午4:23写道: > I set up the following lookup cache values: > > 'lookup.cache.max-rows' = '20' > 'lookup.cache.ttl' = '1min' > > for a jdbc connector. > > This table currently only has about 2 records in it. However, > since I

Re: How can I optimize joins or cache misses in SQL api?

2020-12-09 Thread Danny Chan
Hi, Marco Villalobos ~ It's nice to see that you choose the SQL API which is more concise and expressive. To answer some of your questions: > Q: Is there a way to control that? I don't want the N + 1 query problem. No, the SQL evaluate row by row, there maybe some optimizations internal that

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 Thread Yun Tang
State本身不是线程安全的 [1],但是目前对于state的更新都是在task主线程内,而task主线程是线程安全的。除非通过一些特别的方式,例如异步的metrics线程用户逻辑下访问state导致的state写更新副作用,一般是不会出现写错的问题。 [1] https://issues.apache.org/jira/browse/FLINK-13072 祝好 唐云 From: bradyMk Sent: Tuesday, December 8, 2020 17:59 To:

Re: A group window expects a time attribute for grouping in a stream environment.THANKS for your help

2020-12-09 Thread Xingbo Huang
Hi, Your code does not show how to create the Table of `Orders`. For how to specify the time attribute according to DDL, you can refer to the official document[1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html Best, Xingbo Appleyuchi 于2020年12月9日周三

Re: Error while connecting with MSSQL server

2020-12-09 Thread Flavio Pompermaier
I issued a PR some time ago at https://github.com/apache/flink/pull/12038 but Flink committers were busy in refactoring that part..I don't know if it is still required to have that part into the jdbc connector Flink code of if using the new factories (that use the java services) you could register

flink 1.12如何使用RateLimiter

2020-12-09 Thread 18757141558
在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter kafka相关的类中没有找到这些配置 请问如何在api中使用RateLimiter(不修改源码方式)

flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi, Community JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。 我的问题是:是否有办法强制刷新buffer中的数据入库? @Public public interface OutputFormat extends Serializable { /** * Configures this output

Re: Event time issues when Kinesis consumer receives batched messages

2020-12-09 Thread Randal Pitt
Thanks Roman, I'll look into how I go about doing that. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

flink1.12 docker 镜像啥时候有

2020-12-09 Thread jiangjiguang719
请问啥时候 在docker hub中可以看到1.12版本的镜像?

Re:Re:Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-09 Thread 破极
看了下邮箱列表中提到的方式,目前没打算升级flink,采用自定义format方式解决了这个问题。感谢各位大佬。 在 2020-12-09 15:27:09,"破极" 写道: >刚才搜到了,谢谢 > > > > > > > > > > > > > > > > > >在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道: >>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259

使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 Thread 王敏超
在使用standalone模式,并启动sql cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, 并且重启过集群。同样方式使用mysql cdc是可以的。 Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements

Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

2020-12-09 Thread Piotr Nowojski
Hi, At the first glance I can not find anything wrong with those settings. If it was some memory configuration problem that caused this error, I guess it would be visible as an exception somewhere. It's unlikely a GC issue, as if some machine froze and stopped responding for a longer period of