Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 Yun Tang
Hi 蒋涛涛 有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上 --allowNonRestoredState 参数,这样kafka source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。 祝好 唐云 From: 蒋涛涛

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 蒋涛涛
Hi Yun Tang, 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置 “auto.commit.enable” 为false,这个时候就不提交kafka offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。 Yun Tang 于2019年8月29日周四

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 Yun Tang
Hi 蒋涛涛 Flink的kafka consumer一共有三种offset commit模式: 1. OffsetCommitMode.DISABLED 完全disable offset的commit 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal

回复: 全局并行度和算子并行度的关系

2019-08-28 文章 pengcheng...@bonc.com.cn
你好,以我的理解,并行度的优先级setParallelism>命令>配置文件。 每个算子有多个并行度的话,每个并行度占一个slot。 flink sql无法设置并行度。 pengcheng...@bonc.com.cn 发件人: ddwcg 发送时间: 2019-08-29 10:18 收件人: user-zh 主题: 全局并行度和算子并行度的关系 hi, 请问在作业启动时设置的并行度,和后面算子的并行度是什么关系? 比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)?

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 wang jinhai
可以选择从之前的某个checkpoint恢复吧 在 2019/8/29 上午10:01,“蒋涛涛” 写入: Hi everyone: 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 Regards, JackJiang

全局并行度和算子并行度的关系

2019-08-28 文章 ddwcg
hi, 请问在作业启动时设置的并行度,和后面算子的并行度是什么关系? 比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)? 启动时设置的并行数是slot的个数,每个slot的资源是固定的(比如是1G内存),那么后面的算子增加了并行度,资源怎么划分? 另外flink sql的并行度是怎么设置的,我没有发现有setParallelism,只有一个最大并行度的设置:setMaxParallelism() 谢谢

flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 蒋涛涛
Hi everyone: 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 Regards, JackJiang

Re: 关于flink中端对端的精确一次性理解问题

2019-08-28 文章 Qi Luo
对于MySQL sink来说,使用2PC我理解应该是不能用MySQL transaction的。因为如果你在preCommit中(或之前)开启了transaction,任务失败的话数据会直接丢失,没法实现2PC里preCommit成功后必须保证commit成功的语义。一种办法是preCommit时写入mysql临时表,在commit时将临时表数据移动入正式表。 On Wed, Aug 28, 2019 at 2:47 PM 1900 <575209...@qq.com> wrote: > hi, > > >

?????? ????flink????????????????????????????

2019-08-28 文章 1900
hi, ?? public class Sink extends TwoPhaseCommitSinkFunction { //private Connection connection; public Sink() { super(new KryoSerializer

回复: 回复: flink日志级别问题

2019-08-28 文章 陈思
Exceptions看到的是运行时异常吧,看不到log4j输出的ERROR日志。我的目的是代码控制log4j的级别是ERROR -- 原始邮件 -- 发件人: "wang jinhai"; 发送时间: 2019年8月28日(星期三) 下午2:15 收件人: "user-zh@flink.apache.org"; 主题: Re: 回复: flink日志级别问题 Flink的UI是有Exceptions页面的啊 各位 在 2019/8/28 下午2:05,“陈思”<58683...@qq.com> 写入:

Re: 回复: flink日志级别问题

2019-08-28 文章 wang jinhai
Flink的UI是有Exceptions页面的啊 各位 在 2019/8/28 下午2:05,“陈思”<58683...@qq.com> 写入: 是的,我的目的也是只想在web页面上面看到ERROR日志 -- 原始邮件 -- 发件人: "高飞龙"; 发送时间: 2019年8月28日(星期三) 中午1:50 收件人: "user-zh"; 主题: Re:回复: flink日志级别问题

?????? flink????????????

2019-08-28 文章 ????
webERROR -- -- ??: "??"; : 2019??8??28??(??) 1:50 ??: "user-zh"; : Re:?? flink weberrores --