?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 ????
~




--  --
??: "shengjk1";
: 2019??3??27??(??) 12:10
??: "user-zh@flink.apache.org";
: "user-zh"; 
: ??  flink savepoints ?? checkpoints??



 ??flink-connector-kafkakafka 
0.8zkkafka??(??kafka??API)


Flinkoffsetcheckpoint??
??checkpointkafka??APIoffset??
??checkpoint??flinkcheckpointoffsetstate 
backends??kafka consumer??Api??kafka

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html


Best,
Shengjk1




??2019??03??27?? 11:52??<1048095...@qq.com> ??
??checkpoint??savepoint

1??flink  
kafka??kafka??offsetflink??zookeeper
PS??zookeeper??offset
2flink??offsetzookeeper??




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:38
??: "user-zh";

: : ?? flink savepoints ?? checkpoints??






 savepoint 
??savepoints





baiyg25...@hundsun.com



 
?? 2019-03-27 11:03
 user-zh
?? ?? flink savepoints ?? checkpoints??


??savepointsavepoints




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:01
??: "user-zh";

: : flink savepoints ?? checkpoints??



checkpoints:
checkpoint??

savepoints??
savepointssavepointssavepointssavepoints





baiyg25...@hundsun.com

 
?? 2019-03-27 10:48
 user-zh
?? flink savepoints ?? checkpoints??
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 shengjk1
 ??flink-connector-kafkakafka 
0.8zkkafka??(??kafka??API)


Flinkoffsetcheckpoint??
??checkpointkafka??APIoffset??
??checkpoint??flinkcheckpointoffsetstate 
backends??kafka consumer??Api??kafka

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html


Best,
Shengjk1




??2019??03??27?? 11:52??<1048095...@qq.com> ??
??checkpoint??savepoint

1??flink  
kafka??kafka??offsetflink??zookeeper
PS??zookeeper??offset
2flink??offsetzookeeper??




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:38
??: "user-zh";

: : ?? flink savepoints ?? checkpoints??






 savepoint 
??savepoints





baiyg25...@hundsun.com



 
?? 2019-03-27 11:03
 user-zh
?? ?? flink savepoints ?? checkpoints??


??savepointsavepoints




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:01
??: "user-zh";

: : flink savepoints ?? checkpoints??



checkpoints:
checkpoint??

savepoints??
savepointssavepointssavepointssavepoints





baiyg25...@hundsun.com

 
?? 2019-03-27 10:48
 user-zh
?? flink savepoints ?? checkpoints??
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

?????? ?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 ????
??checkpoint??savepoint

1??flink  
kafka??kafka??offsetflink??zookeeper
PS??zookeeper??offset
2flink??offsetzookeeper??




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:38
??: "user-zh";

: : ?? flink savepoints ?? checkpoints??



 
 

 savepoint 
??savepoints





 baiyg25...@hundsun.com


  
 
?? 2019-03-27 11:03
 user-zh
?? ?? flink savepoints ?? checkpoints??


??savepointsavepoints
  
  
  
  
 --  --
 ??: "baiyg25...@hundsun.com";
 : 2019??3??27??(??) 11:01
 ??: "user-zh";
  
 : : flink savepoints ?? checkpoints??
  
  
  
 checkpoints:
 
checkpoint??
  
 savepoints??
 
savepointssavepointssavepointssavepoints
  
  
  
  
  
 baiyg25...@hundsun.com
  
  
 ?? 2019-03-27 10:48
  user-zh
 ?? flink savepoints ?? checkpoints??
 ??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
 PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

?????? ?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 ????
??




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:38
??: "user-zh";

: : ?? flink savepoints ?? checkpoints??



 
 

 savepoint 
??savepoints





 baiyg25...@hundsun.com


  
 
?? 2019-03-27 11:03
 user-zh
?? ?? flink savepoints ?? checkpoints??


??savepointsavepoints
  
  
  
  
 --  --
 ??: "baiyg25...@hundsun.com";
 : 2019??3??27??(??) 11:01
 ??: "user-zh";
  
 : : flink savepoints ?? checkpoints??
  
  
  
 checkpoints:
 
checkpoint??
  
 savepoints??
 
savepointssavepointssavepointssavepoints
  
  
  
  
  
 baiyg25...@hundsun.com
  
  
 ?? 2019-03-27 10:48
  user-zh
 ?? flink savepoints ?? checkpoints??
 ??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
 PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

????: ?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 baiyg25...@hundsun.com

 savepoint 
??savepoints




baiyg25...@hundsun.com
 
 
?? 2019-03-27 11:03
 user-zh
?? ?? flink savepoints ?? checkpoints??
??savepointsavepoints
 
 
 
 
--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:01
??: "user-zh";
 
: : flink savepoints ?? checkpoints??
 
 
 
checkpoints:

checkpoint??
 
savepoints??

savepointssavepointssavepointssavepoints
 
 
 
 
 
baiyg25...@hundsun.com
 
?? 2019-03-27 10:48
 user-zh
?? flink savepoints ?? checkpoints??
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints


?????? flink savepoints ?? checkpoints??????

2019-03-26 文章 ????
??savepointsavepoints




--  --
??: "baiyg25...@hundsun.com";
: 2019??3??27??(??) 11:01
??: "user-zh";

: : flink savepoints ?? checkpoints??



checkpoints:

checkpoint??

savepoints??

savepointssavepointssavepointssavepoints





baiyg25...@hundsun.com
 
 
?? 2019-03-27 10:48
 user-zh
?? flink savepoints ?? checkpoints??
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

Re: Re: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq】

sql:
select EVENTTIME,ID,EVENT_ID,MSISDN,TS 
from (select a.*,ROW_NUMBER() over(partition by EVENT_ID,MSISDN order by TS 
desc) AS rw
          from table1 a
) where rw = 1

tableEnv.toRetractStream(结果表, Row.class).print();


输出结果,分析结果发现,第二条的  1553652720961584  比第一条的时间 1553652720927835 更大,同时输出一条 false 
的,数据结果与第一条相同,説明第三条是用来作删除操作,删掉第一条数据。。。

(true,2019-03-27 
02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XX,1553652720927835)
(true,2019-03-27 
02:12:00.0,1243296274875303910,"1c3.2729.20190327021200",XX,1553652720961584)
(false,2019-03-27 
02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XX,1553652720927835)

结论:true 是用来插入数据的,false 是用来删除数据的,出现false时一定会有一条之前插入的数据 。。。

邓成刚【qq】
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:40
收件人: user-zh
主题: Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 
update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。
 
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25...@hundsun.com

????: flink savepoints ?? checkpoints??????

2019-03-26 文章 baiyg25...@hundsun.com
checkpoints:

checkpoint??

savepoints??

savepointssavepointssavepointssavepoints





baiyg25...@hundsun.com
 
 
?? 2019-03-27 10:48
 user-zh
?? flink savepoints ?? checkpoints??
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints


flink savepoints ?? checkpoints??????

2019-03-26 文章 ????
??flink savepoints ?? 
checkpointscheckpoints??savepointssavepoints??savepoints??
PS??checkpoints, 
3s??cancel??-s??savepointssavepointscheckpoints

Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq】
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 
update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。

 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25...@hundsun.com

回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq】
这里面决定 update 或 delete 的 Boolean型值 怎么赋?

这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段

不知道我的理解是否正确,期待大佬解答。。。

邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!

        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!

        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].

       

        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}

baiyg25...@hundsun.com

blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-26 文章 邓成刚【qq】
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar

消费正常的code:

String sql = "select * from table1"

Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");



如果把SQL换成如下就会timeout...

String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
        +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from 
table1"         
       +") group by TUMBLE(EVENTTIME,INTERVAL '1' 
MINUTE),NEW_EVENT_ID,MSISDN"); 



Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");


异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at 
com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)






实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 baiyg25...@hundsun.com
大家好!

伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!

目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].

   

主要盲点:
1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
 @Override
 public void setKeyFields(String[] keys) {}
@Override
public void setIsAppendOnly(Boolean isAppendOnly){}



baiyg25...@hundsun.com


Re: RocksDB中指定nameNode 的高可用

2019-03-26 文章 Yun Tang
Hi

Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。

祝好
唐云

From: 戴嘉诚 
Sent: Tuesday, March 26, 2019 16:57
To: user-zh@flink.apache.org
Subject: RocksDB中指定nameNode 的高可用

  嘿,我想询问一下,flink中的RocksDB位置  
我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active 
nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉



RocksDB中指定nameNode 的高可用

2019-03-26 文章 戴嘉诚
  嘿,我想询问一下,flink中的RocksDB位置  
我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active 
nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉



如何实现 UpsertStreamTableSink , BatchTableSink 接口

2019-03-26 文章 baiyg25...@hundsun.com
大家好!

有没有伙伴对 blink 中 flink-table 模块下的 UpsertStreamTableSink , BatchTableSink 
这两个接口比较熟悉?或者对TableSink这块处理原理比较熟悉?我想实现这两个接口,以实现JDBC更新功能,自己看源码只能看懂表面,希望熟悉的伙伴能给些指导。。。



baiyg25...@hundsun.com


re:回复:fw:Blink SQL报错

2019-03-26 文章 bigdatayunzhongyan
收到,感谢姬平老师的专业解答。

谢谢各位!


发件人: 胥平勇(姬平)
发送时间: 2019-03-26 15:01:15
收件人:  bigdatayunzhongyan
抄送:  user-zh; Bowen Li
主题: 回复:fw:Blink SQL报错
Hi bigdatayunzhongyan:

1. SQL语法不支持:
这个可以参照代码里面TpcDsBatchExecPlanTest的单测,我们使用的sql query也都放在了工程里。看看是不是有些query的语法有些区别。

2. 执行方式:
我们自己benchmark的时候采用的是依赖tableEnv api写代码的方式,在另一个工程里面去采集统计信息,解析query,提交job.
类似于代码里面的TpcDsBatchExecITCase(和TpcDsBatchExecPlanTest一样的plan优化配置,并且需要采集统计信息AnalyzeStatistic.generateTableStats(tEnv,
 tableName, schema.getFieldNames))

sqlClient的执行入口还有些问题。

你先按这个调整一下,看是否能跑起来,跑起来后我们再一起看下其它性能相关的配置或环境问题。

--
发件人:Jark Wu 
发送时间:2019年3月26日(星期二) 12:21
收件人:bigdatayunzhongyan ; 胥平勇(姬平) 

抄 送:user-zh ; Bowen Li 
主 题:Re: fw:Blink SQL报错

cc 姬平老师帮忙看下这个问题。

On Mon, 25 Mar 2019 at 19:23, bigdatayunzhongyan 
 wrote:
@Bowen @jark 有时间帮忙看下
谢谢!

发件人: bigdatayunzhongyan
发送时间: 2019-03-25 19:17:22
收件人:  user-zh-help
主题: Blink SQL报错
Hi,all:
        问题详见附件:
环境信息:
        环境 hadoop2.7.2 blink hive1.2.1
        参数 ./bin/yarn-session.sh -n 50 -s 2 -jm 3072 -tm 4096 -d
./bin/sql-client.sh embedded -s application_
        数据 tpc-ds 500G总量数据

很多SQL都无法执行成功,不仅仅是SQL兼容性的问题,还有阿里的同学能否提供下详细测试报告。



Re: Re: flink ha模式进程hang!!!

2019-03-26 文章 Han Xiao
非常谢谢您的解答,这个问题是zk中有失败任务的jobGraph,导致每次启动群集就会去检索,删除zk中残余后重启即可解决。

 
Thank you for your reply!
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 09:40
收件人: user-zh
主题: Re: Re: flink ha模式进程hang!!!
是不是跟这个访问控制有关?
high-availability.zookeeper.client.acl: open
 
 
 
baiyg25...@hundsun.com
发件人: Han Xiao
发送时间: 2019-03-26 09:33
收件人: user-zh@flink.apache.org
主题: Re: Re: flink ha模式进程hang!!!
Hi,早上好,谢谢您的回复,以下是我的配置项及参数:
flink-conf.yaml
common:
jobmanager.rpc.address: test10
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
taskmanager.tmp.dirs: /app/tools/flink-1.7.2/tmp
High Availability:
high-availability: zookeeper
high-availability.storageDir: hdfs://test10:8020/flink/ha/   
##此文件目录可以正常生成,但无jobGraph相关目录;
high-availability.zookeeper.quorum: ip1:2181,ip2:2181,ip3:2181,ip4:2181,ip5:2181
high-availability.zookeeper.client.acl: open
Fault tolerance and checkpointing:
state.backend: filesystem
state.checkpoints.dir: hdfs://test10:8020/flink-checkpoints  ##此目录没有生成;
Web Frontend:
rest.port: 8081
masters: slaves:
test10:8081   test12
test11 : 8082test13
 test14
以上为全部配置项,结合下面报的错误信息检索路径,我的配置中并没有。。。很让我不解。
Thank you for your reply!
发件人: Zili Chen
发送时间: 2019-03-25 19:57
收件人: user-zh@flink.apache.org
主题: Re: flink ha模式进程hang!!!
看起来是 HDFS 去 /flink/ha/zookeeper/submittedJobGraphb05001535f91 这个路径下找
submittedJobGraph,这个看起来就不太对。
Flink 的 ha 需要配置 zk 的路径和把 state 存到 file system 的路径,你可以试试把
high-availability.storageDir
配成一个有效的 HDFS 路径
Best,
tison.
Zili Chen  于2019年3月25日周一 下午7:53写道:
> 能提供你的 ha 配置吗?特别是 high-availability.storageDir,我怀疑是不是没有配置这个啊
> Best,
> tison.
>
>
> Han Xiao  于2019年3月25日周一 下午7:26写道:
>
>> 各位朋友大家好,我是flink初学者,部署flink ha的过程中出现一些问题,麻烦大家帮忙看下;
>> 启动flink ha后,jobmanager进程直接hang,使用的flink 1.7.2版本,下面log中有一处出现此错误  File does
>> not exist: /flink/ha/zookeeper/submittedJobGraphb05001535f91
>> ,让我不解的是我的checkpoint目录以及ha目录并不是这个,为什么会到这个目录去找,我所配置的目录下没有生成JobGraph ,他会一直去检索
>> /a5ffe00b0bc5688d9a7de5c62b8150e6
>> 这个作业图而且找不到,我删除了所有相关的配置路径之后重新搭建,启动时还是会去检索,我该怎样避免flink去检索这个JobGraph
>> ,让我的ha群集健康的运行起来。
>>
>>
>> 报错日志:
>> 2019-03-25 18:55:00,742 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred in the cluster entrypoint.
>> java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could
>> not retrieve submitted JobGraph from state handle under
>> /a5ffe00b0bc5688d9a7de5c62b8150e6. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:74)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>> ...
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /a5ffe00b0bc5688d9a7de5c62b8150e6. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
>> 
>> Caused by: java.io.FileNotFoundException: File does not exist:
>> /flink/ha/zookeeper/submittedJobGraphb05001535f91
>> at
>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
>> at
>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2100)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2070)
>> ...
>> Caused by: 
>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
>> File does not exist: /flink/ha/zookeeper/submittedJobGraphb05001535f91
>> at
>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
>> at
>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2100)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2070)
>> ...
>>
>> 谢谢!
>>
>