??????????Async I/O??exactly-once
??io?? ??at least once??The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure. iPhone -- -- ??: Dino Zhang
Re: Streaming File Sink疑问
我的做法是 重新配置 HADOOP_CONF_DIR 环境变量:在flink集群里面配置 core-site.xml 和 hdfs-site.xml,同时将 HADOOP_CONF_DIR 环境变量 指向这个文件目录 > 在 2019年9月4日,上午11:16,戴嘉诚 写道: > > 大家好: > 我在看到streamingFileSink > 中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢 >
Streaming File Sink疑问
大家好: 我在看到streamingFileSink 中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢
????: flink??????????????????????taskmanager
flink??slave3bin/taskmanager.sh start?? pengcheng...@bonc.com.cn ?? 2019-09-04 10:27 user-zh ?? flink??taskmanager ?? ??flink??1??jobmanager,??2??taskmanager(slave1,slave2)?? ??slave3 taskmnanager??spark --
Re: flink集群模式下如何动态添加taskmanager
Hi on 2019/9/4 10:27, 如影随形 wrote: 在flink集群部署时,有1个jobmanager,有2个taskmanager(slave1,slave2), 现在想添加slave3作为 taskmnanager。如何在不停止集群的情况下,可以像spark一样动态添加吗? AFAIK the answer is NO for now. However, community tells that this has been under consideration from the FLIP-6 Flink Development and Process Model. https://docs.google.com/document/d/1zwBP3LKnJ0LI1R4-9hLXBVjOXAkQS5jKrGYYoPz-xRk/edit#
flink??????????????????????taskmanager
?? ??flink??1??jobmanager,??2??taskmanager(slave1,slave2)?? ??slave3 taskmnanager??spark --
回复: Flink 周期性创建watermark,200ms的周期是怎么控制的
源码参考:PeriodicWatermarkEmitter -邮件原件- 发件人: Dino Zhang 发送时间: Tuesday, September 3, 2019 3:14 PM 收件人: user-zh@flink.apache.org 主题: Re: Flink 周期性创建watermark,200ms的周期是怎么控制的 hi venn, 基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment: /** * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. * * If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } On Tue, Sep 3, 2019 at 2:39 PM venn wrote: > 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建 > watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一 > 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)? > > > > 周期新创建watermark 方法如下: > > .assignAscendingTimestamps(element => > sdf.parse(element.createTime).getTime) > > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50)) > > > > > > 生成Timestamp的方法: > > TimestampsAndPeriodicWatermarksOperator 类的 : > > > > > @Override > public void processElement(StreamRecord element) throws Exception { >final long newTimestamp = > userFunction.extractTimestamp(element.getValue(), > element.hasTimestamp() ? element.getTimestamp() : > Long.MIN_VALUE); > >output.collect(element.replace(element.getValue(), newTimestamp)); > } > > > > > > 生成watermark的方法: > > TimestampsAndPeriodicWatermarksOperator 类的 : > > > @Override > public void onProcessingTime(long timestamp) throws Exception { >// 从这里可以看到,每200ms 打印一次 >System.out.println("timestamp : " + timestamp + ", system.current : > " + System.currentTimeMillis()); >// register next timer >Watermark newWatermark = userFunction.getCurrentWatermark(); >if (newWatermark != null && newWatermark.getTimestamp() > > currentWatermark) { > currentWatermark = newWatermark.getTimestamp(); > // emit watermark > output.emitWatermark(newWatermark); >} > >long now = getProcessingTimeService().getCurrentProcessingTime(); >getProcessingTimeService().registerTimer(now + watermarkInterval, > this); } > > > > > > > > 感谢各位大佬 > >
Re: 关于Async I/O的exactly-once
hi star, exactly-once指flink内部的,要保证end-to-end exactly可以通过两阶段提交,需要实现TwoPhaseCommitSinkFunction,或者做幂等处理 On Wed, Sep 4, 2019 at 8:20 AM star <3149768...@qq.com> wrote: > 看文档我的理解是会将异步的请求保存在检查点中,failover的时候重新触发请求。我的问题是既然是重新触发请求,并没有回滚,那之前的请求已经对外部系统造成影响了,不就是at > least-once了吗? > 比如ck1:发送了a b c三个请求更新外部数据库,ck2:发送:d,e,f。假设ck1做完了checkpoint,a ,b请求成功,c没成功. > > > ck2在执行到e的时候任务被cancel了,但c,d都已经成功了。那么我重新启动的时候从最近一次成功的ck1拉起,c,d岂不是又要被重新请求一次 > > > 谢谢 > > 发自我的iPhone -- Regards, DinoZhang
????Async I/O??exactly-once
??failover??at least-once?? ck1:??a b cck2d??e??f??ck1??checkpoint??a ??b??c??. ck2ecancel??c??d??ck1??c??d iPhone
回复: Flink SQL 时间问题
Hi: 时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换 | | Jimmy | | wangzmk...@163.com | 签名由网易邮箱大师定制 在2019年09月3日 21:25,JingsongLee 写道: Hi: 1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 2.支持long的,你输入是不是int才会报错的,具体报错的信息? Best, Jingsong Lee -- From:hb <343122...@163.com> Send Time:2019年9月3日(星期二) 10:44 To:user-zh Subject:Flink SQL 时间问题 使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table ``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题
Re: Flink SQL 时间问题
Hi: 1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 2.支持long的,你输入是不是int才会报错的,具体报错的信息? Best, Jingsong Lee -- From:hb <343122...@163.com> Send Time:2019年9月3日(星期二) 10:44 To:user-zh Subject:Flink SQL 时间问题 使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table ``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题
Re: 回复: flink使用StateBackend问题
Hi 1. Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure 2. 所有的subtask都是n/a 么,source task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element) 3. 作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back pressure)情况?如果作业反压的话,barrier一直都流不到下游,容易造成checkpoint超时。 建议分享一下作业webUI上的checkpoint 信息。 祝好 唐云 From: Wesley Peng Sent: Tuesday, September 3, 2019 15:44 To: user-zh@flink.apache.org Subject: Re: 回复: flink使用StateBackend问题 on 2019/9/3 15:38, 守护 wrote: > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late > message for now expired checkpoint attempt 3 from > 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. > 请问这个是什么问题呢? 可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大 因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题 also reference: https://juejin.im/post/5c374fe3e51d451bd1663756
Re: 回复: flink使用StateBackend问题
on 2019/9/3 15:38, 守护 wrote: org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 3 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. 请问这个是什么问题呢? 可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大 因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题 also reference: https://juejin.im/post/5c374fe3e51d451bd1663756
?????? flink????StateBackend????
1.FlinkAPI??windowenv.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))rocksDBcheckpoint??Completed checkpoint 142 for job 25a50baff7d16ee22aecb7b1 (806418 bytes in 426 ms). 2.??SQLwindowrocksDB??checkpointwindow??checkpoint??n/a 2019-09-03 15:17:52,142 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1 from 915988718742f11135a6e96ba42c4e35 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,143 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1 from 191ca20964de323300551d9a144c272c of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,144 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1 from 2a24625ac03ea6ac4b8632719768c502 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,144 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1 from 51b376365d044c672677c38ca1caee78 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:17:52,227 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1 from 9266a7307add46cb26f5a9352c82ceb0 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:18:21,881 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 4 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. 2019-09-03 15:18:21,882 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 3 from 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. ?? -- -- ??: "Wesley Peng"; : 2019??9??3??(??) 3:24 ??: "user-zh"; : Re: flinkStateBackend Hi on 2019/9/3 12:14, wrote: > ??StateBackend??env.setStateBackend(new > RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))SQLwindowcheckpoint??n/a?? > > flink-conf.xml?? > state.checkpoints.dir: > hdfs://host51:9000/flink/flink-checkpoints?? > SQLwindow??RocksDBStateBackend It seems like a filesystem issue, though I have no experience on it too. regards.
Re: flink使用StateBackend问题
Hi on 2019/9/3 12:14, 々守护々 wrote: 现象:我是在代码中配置StateBackend的,代码是这样写的env.setStateBackend(new RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true)),但是我发现在下面的SQLwindow中始终在checkpoint那就过不去,总是报n/a, 下面我就把代码中的这句给注释掉了,在flink-conf.xml文件中配置 state.checkpoints.dir: hdfs://host51:9000/flink/flink-checkpoints这样就能正常执行, 请问SQLwindow和RocksDBStateBackend有什么冲突吗,上面的原因是什么? It seems like a filesystem issue, though I have no experience on it too. regards.
Re: Flink 周期性创建watermark,200ms的周期是怎么控制的
hi venn, 基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment: /** * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. * * If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } On Tue, Sep 3, 2019 at 2:39 PM venn wrote: > 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建 > watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一 > 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)? > > > > 周期新创建watermark 方法如下: > > .assignAscendingTimestamps(element => > sdf.parse(element.createTime).getTime) > > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50)) > > > > > > 生成Timestamp的方法: > > TimestampsAndPeriodicWatermarksOperator 类的 : > > > > > @Override > public void processElement(StreamRecord element) throws Exception { >final long newTimestamp = > userFunction.extractTimestamp(element.getValue(), > element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); > >output.collect(element.replace(element.getValue(), newTimestamp)); > } > > > > > > 生成watermark的方法: > > TimestampsAndPeriodicWatermarksOperator 类的 : > > > @Override > public void onProcessingTime(long timestamp) throws Exception { >// 从这里可以看到,每200ms 打印一次 >System.out.println("timestamp : " + timestamp + ", system.current : " + > System.currentTimeMillis()); >// register next timer >Watermark newWatermark = userFunction.getCurrentWatermark(); >if (newWatermark != null && newWatermark.getTimestamp() > > currentWatermark) { > currentWatermark = newWatermark.getTimestamp(); > // emit watermark > output.emitWatermark(newWatermark); >} > >long now = getProcessingTimeService().getCurrentProcessingTime(); >getProcessingTimeService().registerTimer(now + watermarkInterval, this); > } > > > > > > > > 感谢各位大佬 > >
Flink 周期性创建watermark,200ms的周期是怎么控制的
各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建 watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)? 周期新创建watermark 方法如下: .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50)) 生成Timestamp的方法: TimestampsAndPeriodicWatermarksOperator 类的 : @Override public void processElement(StreamRecord element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } 生成watermark的方法: TimestampsAndPeriodicWatermarksOperator 类的 : @Override public void onProcessingTime(long timestamp) throws Exception { // 从这里可以看到,每200ms 打印一次 System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis()); // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } 感谢各位大佬