????????????????

2020-08-16 文章 zhiyezou
HI


day??


??


??day??3select * where day 
now-3??TTL3??flink33??


3






----
??: 
   "user-zh"



(无主题)

2020-08-16 文章 superainbower
hi,社区的小伙伴,大家好!我有一个应用场景,想请教下大家有没有遇过,有什么好的方案。
场景就是:按照user和day的维度统计订单表里的有效订单数,同时存在历史的订单状态随时可能被更新,比如可能当前把2个月前的订单状态置未true,所以没法根据历史结果预统计,翻译称sql就是select
 user,day,count(*) from table where state = true group by 
user,day;目前我已经用flink-sql-cdc-connector实现了,但是有一个问题就是state,因为按user day组合 
那么如果全部状态都保存后期回越来越大,但是如果设置ttl,那么如果历史订单变化,最终更新出去的值也不对。 
希望社区的小伙伴给我出出主意

Re: Flink参数配置设置不生效

2020-08-16 文章 魏烽
Hi  Yang Wang

感谢您的答复:

启动命令是

flink run -m yarn-cluster -ys 2 -p 4 -yjm 2G -ytm 2G -c 
com.nequal.bdh.cdp.IDTest etltest.jar  --qu default

Per-job-cluster模式

 原始邮件
发件人: Yang Wang
收件人: user-zh
发送时间: 2020年8月17日(周一) 11:26
主题: Re: Flink参数配置设置不生效


如果你是on Yarn部署的话,execution.attached这个参数会在FlinkYarnSessionCli里面根据flink run的
cli option进行覆盖,所以你配置在flink conf里面是没有生效的。如果加了-d的话就为设置为false,反之
就是true

Best,
Yang

魏烽 mailto:weif...@nequal.com>> 于2020年8月15日周六 上午9:44写道:

> 各位大佬好:
>
> 在flink-conf.yaml中设置参数execution.attached: false
>
>但是yarn logs查看此参数设置并没有生效,
>
>2020-08-15 09:40:13,489 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: execution.attached, true
>
>而且根据官网说明此参数默认应该是false才对,已确认在代码中并没有对此参数进行设置,请问这是什么情况呀?
>



ScalarFunction 访问 state

2020-08-16 文章 forideal
Hi,
  
最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读 API 发现 
FunctionContext context 并不支持访问 state。
我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。


Best,forideal

jobmanager异常

2020-08-16 文章 18500348...@163.com
请教大家一个问题:

flink1.8.0 on yarn 程序运行一段时间报如下错误,导致 The heartbeat of TaskManager with id 
container_1572430463280_50994_01_04 timed out. 最终程序重启。

各位有没有碰到类似的问题,有什么解决方式吗?

jobmanager.log

2020-08-17 02:53:21,593 ERROR akka.remote.Remoting  
- Association to [akka.tcp://flink@${HOSTNAME}:36968] with UID [19
99537927] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too 
long. (more than 48.0 hours)
at 
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





18500348...@163.com


Print SQL connector无法正常使用

2020-08-16 文章 xiao cai
Hi All:
目前使用flink sql的Print SQL connector,想要将查询的结果打印出来,结果报错:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


可以保证:HBase-connector是在lib包下存在的,是否我还需要在lib下添加什么依赖?


下面为执行的sql:


CREATE TABLE dimension ( 
rowKey STRING, 
cf ROW, 
tas BIGINT 
) WITH ( 
'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 
'connector.table-name' = ’test', 
'connector.write.buffer-flush.max-rows' = '10', 
'connector.zookeeper.quorum' = ‘IP:port', 
'connector.zookeeper.znode.parent' = '/hbase', 
);


CREATE TABLE print_table (
 f0 STRING,
 f1 INT,
 f2 BIGINT,
 f3 BIGINT
) WITH (
 'connector' = 'print'
);


insert into print_table
select rowKey, cf.age, cf.area, tas
from dimension

Re: flink interval join后按窗口聚组问题

2020-08-16 文章 赵一旦
大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
join。
并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。


你这个问题主要就是watermark重设就可以了。



Tianwang Li  于2020年8月16日周日 上午10:45写道:

> 展开讨论一些特点从场景。
> 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> 使用SQL语句的场合,怎么实现?
> 例如:
> SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> rowtime, ...
>
> 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
>
> Tianwang Li  于2020年8月16日周日 上午10:40写道:
>
> > 展开讨论一些特点场景。
> >
> > Benchao Li  于2020年7月6日周一 下午11:08写道:
> >
> >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> >>
> >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> >>
> >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> >> 的时间最早的那个。
> >>
> >> 元始(Bob Hu) <657390...@qq.com> 于2020年7月5日周日 下午8:48写道:
> >>
> >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> >> >
> >> >
> >> > -- 原始邮件 --
> >> > *发件人:* "Benchao Li";
> >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> >> > *收件人:* "元始(Bob Hu)"<657390...@qq.com>;
> >> > *抄送:* "user-zh";
> >> > *主题:* Re: flink interval join后按窗口聚组问题
> >> >
> >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> >> >
> >> > 元始(Bob Hu) <657390...@qq.com> 于2020年7月3日周五 下午3:29写道:
> >> >
> >> >> 您好,我想请教一个问题:
> >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> >> a.rowtime
> >> >> and a.rowtime + INTERVAL '1' HOUR
> >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime
> +
> >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> >> >> allowedLateness +
> >> >>
> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> >> >> rightRelativeSize) +
> >> >>
> >>
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> >> >> flink版本 1.10.0。
> >> >>
> >> >> 下面是我的一段测试代码:
> >> >>
> >> >> import org.apache.commons.net.ntp.TimeStamp;
> >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> >> import org.apache.flink.api.common.typeinfo.Types;
> >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> >> import
> >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> >> >> import
> org.apache.flink.streaming.api.functions.source.SourceFunction;
> >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> >> >> import org.apache.flink.table.api.EnvironmentSettings;
> >> >> import org.apache.flink.table.api.Table;
> >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> >> >> import org.apache.flink.table.functions.ScalarFunction;
> >> >> import org.apache.flink.types.Row;
> >> >> import org.apache.flink.util.Collector;
> >> >> import org.apache.flink.util.IOUtils;
> >> >>
> >> >> import java.io.BufferedReader;
> >> >> import java.io.InputStreamReader;
> >> >> import java.io.Serializable;
> >> >> import java.net.InetSocketAddress;
> >> >> import java.net.Socket;
> >> >> import java.sql.Timestamp;
> >> >> import java.text.SimpleDateFormat;
> >> >> import java.util.ArrayList;
> >> >> import java.util.Date;
> >> >> import java.util.List;
> >> >>
> >> >> public class TimeBoundedJoin {
> >> >>
> >> >> public static AssignerWithPeriodicWatermarks
> >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> >> >> AssignerWithPeriodicWatermarks timestampExtractor = new
> >> AssignerWithPeriodicWatermarks() {
> >> >> private long currentMaxTimestamp = 0;
> >> >> private long lastMaxTimestamp = 0;
> >> >> private long lastUpdateTime = 0;
> >> >> boolean firstWatermark = true;
> >> >> //Integer maxIdleTime = 30;
> >> >>
> >> >> @Override
> >> >> public Watermark getCurrentWatermark() {
> >> >> if(firstWatermark) {
> >> >> lastUpdateTime = System.currentTimeMillis();
> >> >> firstWatermark = false;
> >> >> }
> >> >> if(currentMaxTimestamp != lastMaxTimestamp) {
> >> >> lastMaxTimestamp = currentMaxTimestamp;
> >> >> lastUpdateTime = System.currentTimeMillis();
> >> >>

Re: Re: flink state ttl状态清理和重新计算的疑问

2020-08-16 文章 赵一旦
@Li Benchao
(1)
如果不是每个key一个timer,但超时仍然应该是key级别的吧,只是说清理机制上不是每个key设置一个timer去清理。
比如有个全局的其他机制定期扫描清理,但超时时间应该还是key级别。

(2)
关于超时时间,对于一个key,他的value每次更新超时都会重新计算,还是永远按照这个key创建时时间开始计算呢。

Benchao Li  于2020年8月15日周六 下午7:27写道:

> 是按照每个key来清理的。清理时机是跟它最后的更新时间有关系,
> 也就是在最后一次更新加上state retention时间这么长的时间后会清理。
>
> 最开始实现状态清理的时候,用的都是timer来清理,也就是每个key下都有自己的timer。
> 现在是比较推荐使用state本身的TTL来做状态清理,并且用的是UpdateType.OnCreateAndWrite。
> 不过现在还没有完全把每个算子和function都重构成这样子,所以还有些老的算子还是用的
> timer来实现的。
>
> sunfulin  于2020年8月15日周六 下午6:12写道:
>
> >
> >
> >
> > hi,
> > 我的理解也是按每个key的时间来的,没仔细看具体实现。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-08-15 17:28:43,"art"  写道:
> >
> > The Idle State Retention Time parameters define for how long the state of
> > a key is retained without being updated before it is removed.
> > 我感觉我的理解错了,这个官方描述不是state of a key, 应该是每个key都有自己的过期时间吧,那么你那个状态不是应该以user登陆后
> 开始
> > 计时,不应该是作业启动吧,还望有个大佬可以解惑
> >
> >
> >
> > 在 2020年8月15日,下午3:06,sunfulin  写道:
> >
> >
> > hi,
> > 有可能这个是默认实现。我还发现另外一个问题,如果我不使用minibatch,发现作业的状态貌似不生效。导致输出了多条数据。不知道这是为何。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-08-15 13:30:24,"superainbower"  写道:
> >
> > 新手感觉应该是统一启动后满足TTL设置的时间就会全部清理,如果不这样,你每一个user的清理时间都不一样,那不得记录成百上千的user的更新时间
> >
> > 在2020年08月15日 13:15,sunfulin 写道:
> > hi,community,
> >
> 想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
> > userId, first_value(xxx) from source group by userId,
> > date_format(eventtime,
> > '-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
> > 我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。
> >
> >
> >
> >
> >
> >
> >
>
> --
>
> Best,
> Benchao Li
>


Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-16 文章 key lou
谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
notifyCheckpointComplete 与 Asnapshot 下发的数据到达B   这2者没有必然的先后顺序。

另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
到达了B.

 我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
并行度为N, start  会开启一个事务 编号proccess  用这个事务 编号
去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交


Congxian Qiu  于2020年8月17日周一 上午10:42写道:

> Hi
> 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> Best,
> Congxian
>
>
> key lou  于2020年8月16日周日 下午9:27写道:
>
> > 各位大佬:
> >在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> >
> > public class FlinkCheckpointTest {
> > public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment steamEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > steamEnv.enableCheckpointing(1000L*2);
> > steamEnv
> > .addSource(new FSource()).setParallelism(4)
> > .transform("开始事务", Types.STRING,new
> FStart()).setParallelism(1)
> > .process(new FCombine()).name("事务预处理").setParallelism(4)
> > .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > ;
> > steamEnv.execute("test");
> > }
> >
> >static class FSource extends RichParallelSourceFunction{
> > @Override
> > public void run(SourceContext sourceContext) throws
> > Exception {
> > int I =0;
> > while (true){
> > I = I + 1;
> > sourceContext.collect("thread " +
> > Thread.currentThread().getId() +"-" +I);
> > Thread.sleep(1000);
> > }
> > }
> > @Override
> > public void cancel() {}
> > }
> >
> > static class FStart extends AbstractStreamOperator
> > implements OneInputStreamOperator{
> >volatile Long ckid = 0L;
> > @Override
> > public void processElement(StreamRecord streamRecord)
> > throws Exception {
> > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > output.collect(streamRecord);
> > }
> > @Override
> > public void prepareSnapshotPreBarrier(long checkpointId)
> > throws Exception {
> > log("开启事务: " + checkpointId);
> > ckid = checkpointId;
> > super.prepareSnapshotPreBarrier(checkpointId);
> > }
> > }
> >
> > static class FCombine extends ProcessFunction
> > implements CheckpointedFunction {
> > List ls = new ArrayList();
> > Collector collector =null;
> > volatile Long ckid = 0L;
> >
> > @Override
> > public void snapshotState(FunctionSnapshotContext
> > functionSnapshotContext) throws Exception {
> > StringBuffer sb = new StringBuffer();
> > ls.forEach(x->{sb.append(x).append(";");});
> > log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > ": 时收到数据:" + sb.toString());
> > Thread.sleep(5*1000);
> > collector.collect(sb.toString());
> > ls.clear();
> > Thread.sleep(5*1000);
> > //Thread.sleep(20*1000);
> > }
> > @Override
> > public void initializeState(FunctionInitializationContext
> > functionInitializationContext) throws Exception {}
> > @Override
> > public void processElement(String s, Context context,
> > Collector out) throws Exception {
> > if(StringUtils.isNotBlank(s)){
> > ls.add(s);
> > }
> > log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > ckid);
> > if(collector ==null){
> > collector = out;
> > }
> > }
> > }
> >
> > static class FSubmit extends RichSinkFunction implements
> > /*  CheckpointedFunction,*/ CheckpointListener {
> > List ls = new ArrayList();
> > volatile Long ckid = 0L;
> > @Override
> > public void notifyCheckpointComplete(long l) throws Exception {
> > ckid = l;
> > StringBuffer sb = new StringBuffer();
> > ls.forEach(x->{sb.append(x).append("||");});
> > log("submit checkpoint " + l + " over data:list size" +
> > ls.size()+ "; detail" + sb.toString());
> > ls.clear();
> > }
> > @Override
> > public void invoke(String value, 

Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Congxian Qiu
Hi
   JM/TM 日志如果是 OnYarn 模式,且开了了 log aggreagte 的话[1],应该是能够获取到这个日志的。
   据我所知,暂时没有已知问题会导致增量 checkpoint 不能恢复,如果你遇到的问题确定会导致 增量 checkpoint
恢复失败的话,可以考虑创建一个 Issue

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#log-files
Best,
Congxian


Yang Peng  于2020年8月17日周一 上午11:22写道:

> 在我们自研的开发平台上提交任务用的detach模式,提交完之后就看不到其他日志了,这个问题当天出现了两次,是不是使用增量cp会存在这个恢复失败的情况
>
> Congxian Qiu  于2020年8月17日周一 上午10:39写道:
>
> > Hi
> >你还有失败作业的 JM 和 TM
> > 日志吗?如果有的话可以看一下这两个日志来确定为什么没有恢复成功。因为你说代码未作任何改变,然后恢复失败,这个还是比较奇怪的。
> > Best,
> > Congxian
> >
> >
> > Yang Peng  于2020年8月17日周一 上午10:25写道:
> >
> > > 好的 感谢
> > >
> > > JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
> > >
> > > > hi
> > > >
> > > > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


flink1.11任务资源分批

2020-08-16 文章 Dream-底限
hi、
请问如果想要flink任务自动扩缩容有什么办法吗,反压的时候自动加资源,然后在自动缩。


Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-16 文章 Congxian Qiu
Hi
   如果我理解没错的话,是否添加 -d 会使用不同的模式启动作业(PerJob 和 Session
模式),从错误栈来看猜测是版本冲突了导致的,你有尝试过最新的 1.11 是否还有这个问题吗?
Best,
Congxian


bradyMk  于2020年8月14日周五 下午6:52写道:

> 请问大家:
> 我采用如下命令提交:
> flink run \
> -m yarn-cluster \
> -yn 3 \
> -ys 3 \
> -yjm 2048m \
> -ytm 2048m \
> -ynm flink_test \
> -d \
> -c net.realtime.app.FlinkTest ./hotmall-flink.jar
> 就会失败,报错信息如下:
> [AMRM Callback Handler Thread] ERROR
> org.apache.flink.yarn.YarnResourceManager - Fatal error occurred in
> ResourceManager.
> java.lang.NoSuchMethodError:
>
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
> at
>
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
> at
>
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> [AMRM Callback Handler Thread] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> occurred
> in the cluster entrypoint.
> java.lang.NoSuchMethodError:
>
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
> at
>
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
> at
>
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.yarn.YarnResourceManager - ResourceManager
> akka.tcp://flink@emr-worker-8.cluster-174460:33650/user/resourcemanager
> was
> granted leadership with fencing token 
> [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer -
> Stopped BLOB server at 0.0.0.0:36247
> <
> http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B7.png>
>
> 但是我在提交命令时,不加-d,就可以正常提交运行;更奇怪的是,我运行另一个任务,加了-d参数,可以正常提交。
> 我这个提交失败的任务开始是用如下命令运行的:
> nohup flink run \
> -m yarn-cluster \
> -yn 3 \
> -ys 3 \
> -yjm 2048m \
> -ytm 2048m \
> -ynm flink_test \
> -c net.realtime.app.FlinkTest ./hotmall-flink.jar > /logs/flink.log 2>&1 &
>  > /logs/nohup.out 2>&1 &
>
> 在这个任务挂掉之后,再用-d的方式重启就会出现我开始说的问题,很奇怪,有大佬知道为什么么?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-16 文章 Congxian Qiu
这个问题和下面这个问题[1] 重复了,在另外的邮件列表中已经有相关讨论

[1]
http://apache-flink.147419.n8.nabble.com/Flink-FINISHED-Checkpoint-td6008.html
Best,
Congxian


yulu yang  于2020年8月14日周五 下午1:05写道:

> 对了,我这个flink作业和和分组都是新创建,不存在抽取历史。
>
> 杨豫鲁  于2020年8月13日周四 下午3:33写道:
>
> > 请教大家一个我最近在配置Flink流的过程中遇到问题,
> >
> >
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
> >
> >
> >
> >
> >
>


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-16 文章 Congxian Qiu
Hi  吴磊
请问你们有比较过使用 Redis 和 broadcast state 在你们场景下的区别吗?是什么原因让你们选择 Redis 而不是
BroadcastState 呢?

Best,
Congxian


吴磊  于2020年8月14日周五 下午3:39写道:

> 在我们的生产环境最常用的做法都是通过维表关联的方式进行赋值的;
> 或者可以先将字典数据写进redis,然后再在第一次使用的时候去访问redis,并加载到State中。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> yj5...@gmail.com;
> 发送时间:2020年8月13日(星期四) 中午1:49
> 收件人:"user-zh"
> 主题:请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题
>
>
>
> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。


Re: Flink参数配置设置不生效

2020-08-16 文章 Yang Wang
如果你是on Yarn部署的话,execution.attached这个参数会在FlinkYarnSessionCli里面根据flink run的
cli option进行覆盖,所以你配置在flink conf里面是没有生效的。如果加了-d的话就为设置为false,反之
就是true

Best,
Yang

魏烽  于2020年8月15日周六 上午9:44写道:

> 各位大佬好:
>
> 在flink-conf.yaml中设置参数execution.attached: false
>
>但是yarn logs查看此参数设置并没有生效,
>
>2020-08-15 09:40:13,489 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: execution.attached, true
>
>而且根据官网说明此参数默认应该是false才对,已确认在代码中并没有对此参数进行设置,请问这是什么情况呀?
>


Re: TableColumn为啥不包含comment

2020-08-16 文章 Harold.Miao
谢谢   我想提交这个patch

Shengkai Fang  于2020年8月14日周五 下午4:33写道:

> hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18958
>
> Harold.Miao  于2020年8月13日周四 上午11:08写道:
>
> > hi all
> > 我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-16 文章 Jark Wu
我上面说的“新版 hbase connector”,指的是 Flink 仓库中实现的新版 sink 连接器,对于 HBase server
1.4和1.4.3都是能用的。

On Sat, 15 Aug 2020 at 00:05, xiao cai  wrote:

> Hi Jark:
> 感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as
> of语句,添加后就不会再报这个错了。
> 另外有个问题想请教:1.11中新版hbase
> connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的?
> 再次感谢。
>
>
> Best
> Xiao Cai
>
> 发送自 Windows 10 版邮件应用
>
> 发件人: Jark Wu
> 发送时间: 2020年8月14日 23:23
> 收件人: user-zh
> 主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full
> primary keys
>
>  PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
> 推导不出 PK 也不会报错了。
>  see more:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>
>
> Best,
> Jark
>
>
> On Thu, 13 Aug 2020 at 14:27, xiao cai  wrote:
>
> > Hi All:
> > 使用flink-sql写入hbase sink时报错:
> > UpsertStreamTableSink requires that Table has a full primary keys if it
> is
> > updated.
> >
> >
> > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> > kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> > sql如下:
> > create table user_click_source(
> > `id` bigint,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint,
> > `catalog_id` int,
> > `device_id` int,
> > `user_id` int,
> > `proc_time` timestamp(3)
> > PRIMARY KEY (id) NOT ENFORCED
> > )with(
> > 'connector.type' = 'kafka',
> > ……
> > )
> > ;
> > create table dim_user(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >,
> > ts bigint
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> >
> >
> > create table dim_device(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> >
> >
> > create table dim_catalog(
> > `rowkey` varchar,
> > cf ROW<
> > `id` int,
> > `name` varchar,
> > `kafka_partition` int,
> > `event_time` bigint,
> > `write_time` bigint,
> > `snapshot_time` bigint,
> > `max_snapshot_time` bigint
> > >
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> > create table hbase_full_user_click_case1_sink(
> > `rowkey` bigint,
> > cf ROW<
> > `click_id` bigint,
> > `click_name` varchar,
> > `click_partition` int,
> > `click_event_time` bigint,
> > `click_write_time` bigint,
> > `click_snapshot_time` bigint,
> > `click_max_snapshot_time` bigint,
> > `catalog_id` int,
> > `catalog_name` varchar,
> > `catalog_partition` int,
> > `catalog_event_time` bigint,
> > `catalog_write_time` bigint,
> > `catalog_snapshot_time` bigint,
> > `catalog_max_snapshot_time` bigint,
> > `device_id` int,
> > `device_name` varchar,
> > `device_partition` int,
> > `device_event_time` bigint,
> > `device_write_time` bigint,
> > `device_snapshot_time` bigint,
> > `device_max_snapshot_time` bigint,
> > `user_id` int,
> > `user_name` varchar,
> > `user_partition` int,
> > `user_event_time` bigint,
> > `user_write_time` bigint,
> > `user_snapshot_time` bigint,
> > `user_max_snapshot_time` bigint
> > >,
> > PRIMARY KEY (rowkey) NOT ENFORCED
> > )with(
> > 'connector.type'='hbase',
> > ……
> > )
> > ;
> > insert into hbase_full_user_click_case1_sink
> > select
> > `click_id`,
> > ROW(
> > `click_id`,
> > `click_name`,
> > `click_partition`,
> > `click_event_time`,
> > `click_write_time`,
> > `click_snapshot_time`,
> > `click_max_snapshot_time`,
> > `catalog_id`,
> > `catalog_name`,
> > `catalog_partition`,
> > `catalog_event_time`,
> > `catalog_write_time`,
> > `catalog_snapshot_time`,
> > `catalog_max_snapshot_time`,
> > `device_id`,
> > `device_name`,
> > `device_partition`,
> > `device_event_time`,
> > `device_write_time`,
> > `device_snapshot_time`,
> > `device_max_snapshot_time`,
> > `user_id`,
> > `user_name`,
> > `user_partition`,
> > `user_event_time`,
> > `user_write_time`,
> > `user_snapshot_time`,
> > `user_max_snapshot_time`
> > )
> > from (select
> > click.id as `click_id`,
> > click.name as `click_name`,
> > click.kafka_partition as `click_partition`,
> > click.event_time as `click_event_time`,
> > click.write_time as `click_write_time`,
> > click.snapshot_time as `click_snapshot_time`,
> > click.max_snapshot_time as `click_max_snapshot_time`,
> > cat.cf.id as `catalog_id`,
> > cat.cf.name as `catalog_name`,
> > cat.cf.kafka_partition as `catalog_partition`,
> > cat.cf.event_time as `catalog_event_time`,
> > cat.cf.write_time as `catalog_write_time`,
> > cat.cf.snapshot_time as `catalog_snapshot_time`,
> > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> > dev.cf.id as `device_id`,
> > dev.cf.name as `device_name`,
> > dev.cf.kafka_partition as `device_partition`,
> > 

Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-16 文章 Congxian Qiu
Hi
上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
Best,
Congxian


key lou  于2020年8月16日周日 下午9:27写道:

> 各位大佬:
>在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
>  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
>
> public class FlinkCheckpointTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment steamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> steamEnv.enableCheckpointing(1000L*2);
> steamEnv
> .addSource(new FSource()).setParallelism(4)
> .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
> .process(new FCombine()).name("事务预处理").setParallelism(4)
> .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> ;
> steamEnv.execute("test");
> }
>
>static class FSource extends RichParallelSourceFunction{
> @Override
> public void run(SourceContext sourceContext) throws
> Exception {
> int I =0;
> while (true){
> I = I + 1;
> sourceContext.collect("thread " +
> Thread.currentThread().getId() +"-" +I);
> Thread.sleep(1000);
> }
> }
> @Override
> public void cancel() {}
> }
>
> static class FStart extends AbstractStreamOperator
> implements OneInputStreamOperator{
>volatile Long ckid = 0L;
> @Override
> public void processElement(StreamRecord streamRecord)
> throws Exception {
> log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> output.collect(streamRecord);
> }
> @Override
> public void prepareSnapshotPreBarrier(long checkpointId)
> throws Exception {
> log("开启事务: " + checkpointId);
> ckid = checkpointId;
> super.prepareSnapshotPreBarrier(checkpointId);
> }
> }
>
> static class FCombine extends ProcessFunction
> implements CheckpointedFunction {
> List ls = new ArrayList();
> Collector collector =null;
> volatile Long ckid = 0L;
>
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append(";");});
> log("批处理 " + functionSnapshotContext.getCheckpointId() +
> ": 时收到数据:" + sb.toString());
> Thread.sleep(5*1000);
> collector.collect(sb.toString());
> ls.clear();
> Thread.sleep(5*1000);
> //Thread.sleep(20*1000);
> }
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {}
> @Override
> public void processElement(String s, Context context,
> Collector out) throws Exception {
> if(StringUtils.isNotBlank(s)){
> ls.add(s);
> }
> log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> ckid);
> if(collector ==null){
> collector = out;
> }
> }
> }
>
> static class FSubmit extends RichSinkFunction implements
> /*  CheckpointedFunction,*/ CheckpointListener {
> List ls = new ArrayList();
> volatile Long ckid = 0L;
> @Override
> public void notifyCheckpointComplete(long l) throws Exception {
> ckid = l;
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append("||");});
> log("submit checkpoint " + l + " over data:list size" +
> ls.size()+ "; detail" + sb.toString());
> ls.clear();
> }
> @Override
> public void invoke(String value, Context context) throws Exception
> {
> if(StringUtils.isNotBlank(value)){
> ls.add(value);
> }
> log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> ckid);
> }
> }
> public static void log(String s){
> String name = Thread.currentThread().getName();
> System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> Date())+":"+name + ":" + s);
> }
> }
>


Re: Flink参数配置设置不生效

2020-08-16 文章 Yangze Guo
Hi, 请问你的启动命令是什么?

如果指定了 -d ,那会覆盖这个选项

Best,
Yangze Guo

On Sat, Aug 15, 2020 at 10:52 PM 魏烽  wrote:
>
> 各位大佬:
>
> 这个问题没有人遇到过嘛?
>
>  原始邮件
> 发件人: weifeng
> 收件人: user-zh
> 发送时间: 2020年8月15日(周六) 09:44
> 主题: Flink参数配置设置不生效
>
>
> 各位大佬好:
>
> 在flink-conf.yaml中设置参数execution.attached: false
>
>但是yarn logs查看此参数设置并没有生效,
>
>2020-08-15 09:40:13,489 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: execution.attached, true
>
>而且根据官网说明此参数默认应该是false才对,已确认在代码中并没有对此参数进行设置,请问这是什么情况呀?


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Congxian Qiu
Hi
   你还有失败作业的 JM 和 TM
日志吗?如果有的话可以看一下这两个日志来确定为什么没有恢复成功。因为你说代码未作任何改变,然后恢复失败,这个还是比较奇怪的。
Best,
Congxian


Yang Peng  于2020年8月17日周一 上午10:25写道:

> 好的 感谢
>
> JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
>
> > hi
> >
> > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 退订

2020-08-16 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

simba_cheng  于2020年8月16日周日 下午9:33写道:

> 退订
>
>
>
>
> 成钇辛
> TEL: 150-7783-5100
>
> 我无法承诺帮你解决所有的问题,但我保证不会让你独自去面对。
>
>


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Yang Peng
好的 感谢

JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:

> hi
>
> 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-16 文章 Zhao,Yi(SEC)
根据Context获取timerService,然后获取处理时间即可。


在 2020/8/16 下午7:57,“ゞ野蠻遊戲χ” 写入:

大家好

   
当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?


谢谢!
嘉治



Re: 如何设置FlinkSQL并行度

2020-08-16 文章 Zhao,Yi(SEC)
我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。
此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。

非要解决可以写代码,把souce部分不使用sql实现。
__

在 2020/8/15 下午8:21,“forideal” 写入:

Hi 赵一旦,


目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。
1.并行度超过 topic partition 的时候会造成资源浪费
2.并行度超过 topic partition 后,checkpoint 也无法正常触发了


Best forideal

















在 2020-08-14 12:03:32,"赵一旦"  写道:
>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
>
>Xingbo Huang  于2020年8月14日周五 下午12:01写道:
>
>> Hi,
>>
>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>>
>> Best,
>> Xingbo
>>
>> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
>>
>> > 
并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
>> >
>> > 发件人: "Zhao,Yi(SEC)" 
>> > 日期: 2020年8月13日 星期四 上午11:44
>> > 收件人: "user-zh@flink.apache.org" 
>> > 主题: 如何设置FlinkSQL并行度
>> >
>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
>> >
>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
>> >
>> >
>>




回复:flink-1.10.1 想用 DDL 入 ES5.6

2020-08-16 文章 kcz
谢谢大佬 我先研究研究





-- 原始邮件 --
发件人: Leonard Xu https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 开始
比如你自己实现了Elasticsearch5DynamicSink 


退订

2020-08-16 文章 simba_cheng
退订




成钇辛
TEL: 150-7783-5100

我无法承诺帮你解决所有的问题,但我保证不会让你独自去面对。



在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-16 文章 key lou
各位大佬:
   在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
 collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。

public class FlinkCheckpointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment steamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
steamEnv.enableCheckpointing(1000L*2);
steamEnv
.addSource(new FSource()).setParallelism(4)
.transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
.process(new FCombine()).name("事务预处理").setParallelism(4)
.addSink(new FSubmit()).name("提交事务").setParallelism(1)
;
steamEnv.execute("test");
}

   static class FSource extends RichParallelSourceFunction{
@Override
public void run(SourceContext sourceContext) throws Exception {
int I =0;
while (true){
I = I + 1;
sourceContext.collect("thread " +
Thread.currentThread().getId() +"-" +I);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
}

static class FStart extends AbstractStreamOperator
implements OneInputStreamOperator{
   volatile Long ckid = 0L;
@Override
public void processElement(StreamRecord streamRecord)
throws Exception {
log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
output.collect(streamRecord);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId)
throws Exception {
log("开启事务: " + checkpointId);
ckid = checkpointId;
super.prepareSnapshotPreBarrier(checkpointId);
}
}

static class FCombine extends ProcessFunction
implements CheckpointedFunction {
List ls = new ArrayList();
Collector collector =null;
volatile Long ckid = 0L;

@Override
public void snapshotState(FunctionSnapshotContext
functionSnapshotContext) throws Exception {
StringBuffer sb = new StringBuffer();
ls.forEach(x->{sb.append(x).append(";");});
log("批处理 " + functionSnapshotContext.getCheckpointId() +
": 时收到数据:" + sb.toString());
Thread.sleep(5*1000);
collector.collect(sb.toString());
ls.clear();
Thread.sleep(5*1000);
//Thread.sleep(20*1000);
}
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {}
@Override
public void processElement(String s, Context context,
Collector out) throws Exception {
if(StringUtils.isNotBlank(s)){
ls.add(s);
}
log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid);
if(collector ==null){
collector = out;
}
}
}

static class FSubmit extends RichSinkFunction implements
/*  CheckpointedFunction,*/ CheckpointListener {
List ls = new ArrayList();
volatile Long ckid = 0L;
@Override
public void notifyCheckpointComplete(long l) throws Exception {
ckid = l;
StringBuffer sb = new StringBuffer();
ls.forEach(x->{sb.append(x).append("||");});
log("submit checkpoint " + l + " over data:list size" +
ls.size()+ "; detail" + sb.toString());
ls.clear();
}
@Override
public void invoke(String value, Context context) throws Exception {
if(StringUtils.isNotBlank(value)){
ls.add(value);
}
log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
}
}
public static void log(String s){
String name = Thread.currentThread().getName();
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
Date())+":"+name + ":" + s);
}
}