flink sql 计算列不支持comment

2020-08-28 文章 sllence
Flink版本:1.11.1

 

官网文档中定义如下:

:

  column_name AS computed_column_expression [COMMENT column_comment]

我看到官方文档中计算列是可以指定column_comment的,但我尝试了一下发现会报错

 

使用方式和报错信息如下:

create table t1( 

   data_time STRING, 

   row1_time AS to_timestamp(data_time) COMMENT 'test'

   WATERMARK FOR row1_time AS row1_time - INTERVAL '5' SECOND

) with(...)

 

org.apache.flink.sql.parser.impl.ParseException: Encountered "COMMENT" at
line 1, column 74.

Was expecting one of:

"FILTER" ...

"OVER" ...

"WITHIN" ...

")" ...

"," ...

"." ...

"NOT" ...

"IN" ...

"<" ...

"<=" ...

">" ...

">=" ...

"=" ...

"<>" ...

"!=" ...

"BETWEEN" ...

"LIKE" ...

"SIMILAR" ...

"+" ...

"-" ...

"*" ...

"/" ...

"%" ...

"||" ...

"AND" ...

"OR" ...

"IS" ...

"MEMBER" ...

"SUBMULTISET" ...

"CONTAINS" ...

"OVERLAPS" ...

"EQUALS" ...

"PRECEDES" ...

"SUCCEEDS" ...

"IMMEDIATELY" ...

"MULTISET" ...

"[" ...

"FORMAT" ...

"IGNORE" ...

"RESPECT" ...



   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(F
linkSqlParserImpl.java:36086)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSq
lParserImpl.java:35900)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlP
arserImpl.java:5271)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkS
qlParserImpl.java:6269)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParser
Impl.java:19047)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserIm
pl.java:3308)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlPars
erImpl.java:2775)

   at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSq
lParserImpl.java:252)

   at
org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:201)



Re: 关于sink失败 不消费kafka消息的处理

2020-08-28 文章 Eleanore Jin
Hi shizk233,

非常感谢你的解答,困扰我多时的问题终于明白了! 谢谢!


On Thu, Aug 27, 2020 at 10:28 PM shizk233 
wrote:

> Hi Eleanore,
>
> 我觉得是不一样的,差别就在于kafka auto commit发生在source算子消费了kafka event时(不会等待数据完成sink写入),
> 而chk机制提交offset发生在所有节点完成同一chk后。
>
>
> 虽然sink是stateless的,但这不妨碍它做chk。做chk的条件就是算子收到chk的barrier消息并且把barrier消息之前的数据处理完成。
> 所以chk机制提交offset时,可以保证之前的数据已经写入sink,是at least once的。
>
> Eleanore Jin  于2020年8月28日周五 上午1:17写道:
>
> > 感谢大家的回答,
> >
> > 我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR,
> 如果看source
> > 的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239
> > >
> > 但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
> > >,
> > 所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
> > kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once
> >
> > 谢谢!
> >
> > On Wed, Aug 26, 2020 at 7:31 PM 范超  wrote:
> >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > >
> > >
> > >
> >
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> > > 从而重启消费会从source的1开始重新消费
> > >
> > >
> > > -邮件原件-
> > > 发件人: Benchao Li [mailto:libenc...@apache.org]
> > > 发送时间: 2020年8月27日 星期四 10:06
> > > 收件人: user-zh 
> > > 主题: Re: 关于sink失败 不消费kafka消息的处理
> > >
> > > Hi Eleanore,shizk233 同学给出的解释已经很全面了。
> > >
> > > 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> > > 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> > > 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> > > at least once的语义,也就是数据可能会重复,但是不会丢。
> > >
> > > Eleanore Jin  于2020年8月27日周四 上午9:53写道:
> > >
> > > > Hi shizk233,
> > > >
> > > > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > > > topic,
> > > > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer
> send,
> > > > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> > > >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > > >
> > > > 谢谢!
> > > > Eleanore
> > > >
> > > > On Wed, Aug 26, 2020 at 9:32 AM shizk233 <
> wangwangdaxian...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > > > >
> > > > > 1.chk与at least once
> > > > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > > > >
> > > > > 2. sink2PC
> > > > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > > > >
> > > > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > > > >
> > > > > 3.kafka auto commit
> > > > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > > > n的时候才提交offset。
> > > > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > > > n的完成。
> > > > >
> > > > > Eleanore Jin  于2020年8月26日周三 下午11:51写道:
> > > > >
> > > > > > Hi Benchao
> > > > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > > > 都是kafka, 如果
> > > > > sink
> > > > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > > > commit
> > > > > > offset 看起来似乎没有什么区别
> > > > > >
> > > > > > 可否具体解释一下? 谢谢!
> > > > > >
> > > > > > Eleanore
> > > > > >
> > > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li  >
> > > > wrote:
> > > > > >
> > > > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > > > >
> > > > > > > 范超  于2020年8月26日周三 上午11:38写道:
> > > > > > >
> > > > > > > > 大家好,我现在有个疑问
> > > > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > > > 交kafka的消费位移呢?
> > > > > > > >
> > > > > > > >
> > > > > > > > 多谢大家了
> > > > > > > >
> > > > > > > > 范超
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-28 文章 大罗
Hi Rui Li,

谢谢你的回复。

另外,我在apache jira flink,发起了issue ” flink sql 1.11 HbaseTableSource Supports
FilterPushDown”

https://issues.apache.org/jira/browse/FLINK-19088

希望能够尽快发布,改善在flink sql 1.11和hbase的整合效率。谢谢!




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 一个main方法启动2个yarn job问题

2020-08-28 文章 tison
应该说 SQL 的 update 会在底层也 call 一次 env.execute

如果你配的是所谓的 detach 模式,是有这种可能的。这个是实现问题,你可以先贴一下代码,然后描述你要的行为,看下可以怎么写

Best,
tison.


Rui Li  于2020年8月28日周五 下午9:59写道:

> 作业代码是怎么写的啊?按说写SQL的话不需要执行Env.execute
>
> On Fri, Aug 28, 2020 at 9:41 AM air23  wrote:
>
> > 你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢?
> > 我是先用datastream 接入kafka。然后转成table sql写入到tidb
> > 2个job name 一个叫Env.execute配置的名字
> > 一个是叫insert 写入tidb的sql语句名字
> >
> >
>
> --
> Best regards!
> Rui Li
>


Re: 一个main方法启动2个yarn job问题

2020-08-28 文章 Rui Li
作业代码是怎么写的啊?按说写SQL的话不需要执行Env.execute

On Fri, Aug 28, 2020 at 9:41 AM air23  wrote:

> 你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢?
> 我是先用datastream 接入kafka。然后转成table sql写入到tidb
> 2个job name 一个叫Env.execute配置的名字
> 一个是叫insert 写入tidb的sql语句名字
>
>

-- 
Best regards!
Rui Li


Re: Re:Re: Flink SQL 问题;

2020-08-28 文章 JasonLee
hi

把Flink-jdbc的包放到flink/lib下面再跑下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-28 文章 Rui Li
Hello,

目前hive connector没有支持hive的storage handler表 [1],也就是说通过STORED
BY定义的表现在是不支持的。普通的external表是支持的。

[1]
https://cwiki.apache.org/confluence/display/Hive/StorageHandlers#StorageHandlers-DDL

On Fri, Aug 28, 2020 at 2:43 PM Leonard Xu  wrote:

> Hi
>
> > 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown.
> 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下
> 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc
> Rui Li 确认下
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/projects/FLINK/summary <
> https://issues.apache.org/jira/projects/FLINK/summary>
> >
> > 关于"select * from hive_hbase_t1"的异常日志如下。
> >
> >
> > Flink SQL> select * from hive_hbase_t1;
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.vectorized.use.checked.expressions does not
> exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.no.partition.filter does not
> exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
> > 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.vectorized.input.format.excludes does not
> exist
> > 2020-08-28 13:20:19,986 WARN  org.apache.hadoop.hive.conf.HiveConf
>
> > [] - HiveConf of name hive.strict.checks.bucketing does not exist
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> > error.,  > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > job.
> >   at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
> >   at
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >   at
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> >   at
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> >   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >   at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >   at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >   at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >   at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >   at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not
> > instantiate JobManager.
> >   at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> >   at
> >
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> >   ... 6 more
> > Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits
> > caused an error: Unable to instantiate the hadoop input format
> >   at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init>(ExecutionJobVertex.java:272)
> >   at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
> >   at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
> >   at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
> >   at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
> >   at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> >   at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> >   at
> >
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> >   at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> >   at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> >   at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> >   at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> >   at
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
> >   at
> >
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> >   at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> >   ... 7 more
> > Caused by: 

Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-28 文章 大罗
请问,如何在社区建立issue "HbaseTableSource 支持 SupportsFilterPushDown",有guideline吗。

另外,"第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc
Rui Li 确认下",你说的"Rui Li",请问,我要怎么抄送,可以给他的邮件吗,还是你已经抄送给他。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11时间函数

2020-08-28 文章 Benchao Li
不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。

Dream-底限  于2020年8月28日周五 下午2:50写道:

> hi
>
> UNIX_TIMESTAMP()
>
> NOW()
>
> 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
>


-- 

Best,
Benchao Li


Re:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-28 文章 Haibo Sun
Congratulations Dian !



Best,
Haibo

At 2020-08-27 18:03:38, "Zhijiang"  wrote:
>Congrats, Dian!
>
>
>--
>From:Yun Gao 
>Send Time:2020年8月27日(星期四) 17:44
>To:dev ; Dian Fu ; user 
>; user-zh 
>Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congratulations Dian !
>
> Best
> Yun
>
>
>--
>Sender:Marta Paes Moreira
>Date:2020/08/27 17:42:34
>Recipient:Yuan Mei
>Cc:Xingbo Huang; jincheng sun; 
>dev; Dian Fu; 
>user; user-zh
>Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congrats, Dian!
>On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
>Congrats!
>On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>Congratulations Dian!
>
>Best,
>Xingbo
>jincheng sun  于2020年8月27日周四 下午5:24写道:
>
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
>the Apache Flink Project Management Committee (PMC).
>
>Dian Fu has been very active on PyFlink component, working on various 
>important features, such as the Python UDF and Pandas integration, and keeps 
>checking and voting for our releases, and also has successfully produced two 
>releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
>release of Flink 1.12.
>
>Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
>Best,
>Jincheng(on behalf of the Flink PMC)
>


Re: flink文档

2020-08-28 文章 Yun Tang
Hi

SQL解析不通过的可以在 https://issues.apache.org/jira/projects/FLINK/issues 
里面创建相关ticket指明出来,很快会有相关开发来帮助的。
不过需要注意的是,需要用英文进行阐述。

祝好
唐云


From: Dream-底限 
Sent: Friday, August 28, 2020 16:42
To: user-zh@flink.apache.org 
Subject: flink文档

hi、
哪位大佬可以把flink官方文档中的函数部分完善一下啊,函数下面配个应用方式可好,看文档我都不知道下面函数是怎么用的,有的可以直接用有的sql解析不通过,还得一个一个测。。。
Temporal functionsDescription

DATE string

Returns a SQL date parsed from *string* in form of "-MM-dd".

TIME string

Returns a SQL time parsed from *string* in form of "HH:mm:ss".

TIMESTAMP string

Returns a SQL timestamp parsed from *string* in form of "-MM-dd
HH:mm:ss[.SSS]".

INTERVAL string range

Parses an interval *string* in the form "dd hh:mm:ss.fff" for SQL intervals
of milliseconds or "-mm" for SQL intervals of months. An interval range
might be DAY, MINUTE, DAY TO HOUR, or DAY TO SECOND for intervals of
milliseconds; YEAR or YEAR TO MONTH for intervals of months.

E.g., INTERVAL '10 00:00:00.004' DAY TO SECOND, INTERVAL '10' DAY, or INTERVAL
'2-10' YEAR TO MONTH return intervals.

CURRENT_DATE

Returns the current SQL date in the UTC time zone.

CURRENT_TIME

Returns the current SQL time in the UTC time zone.

CURRENT_TIMESTAMP

Returns the current SQL timestamp in the UTC time zone.

LOCALTIME

Returns the current SQL time in local time zone.

LOCALTIMESTAMP

Returns the current SQL timestamp in local time zone.


flink如何将时间戳转换为长整型并精确到毫秒

2020-08-28 文章 Dream-底限
hi、
我这面使用flink时间函数转换为长整型的时候只能做到秒,除了自定义udf有办法做到转换到毫秒精度吗


Re: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 赵一旦
大概懂了,不过还是很奇怪。counter和accumulator究竟啥区别,干嘛要分开实现2套机制。

范超  于2020年8月28日周五 下午4:14写道:

> 确实可以通过counter来实现[1]
> 相应的blog在[2]
>
> [1] https://github.com/mbode/flink-prometheus-example
> [2]https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>
>
> -邮件原件-
> 发件人: Yun Tang [mailto:myas...@live.com]
> 发送时间: 2020年8月28日 星期五 14:37
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink prometheus 无法上报accumulator类型监控吗
>
> Hi
>
> 没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters
> [1] 这四种,如果你想要用累积型metrics,可以考虑counters
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types
>
> 祝好
> 唐云
>
> 
> From: 赵一旦 
> Sent: Friday, August 28, 2020 10:53
> To: user-zh@flink.apache.org 
> Subject: Re: flink prometheus 无法上报accumulator类型监控吗
>
> hi,有人回答下这个问题吗。
>
> 赵一旦  于2020年8月21日周五 下午4:20写道:
>
> > 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
> >
>


flink文档

2020-08-28 文章 Dream-底限
hi、
哪位大佬可以把flink官方文档中的函数部分完善一下啊,函数下面配个应用方式可好,看文档我都不知道下面函数是怎么用的,有的可以直接用有的sql解析不通过,还得一个一个测。。。
Temporal functionsDescription

DATE string

Returns a SQL date parsed from *string* in form of "-MM-dd".

TIME string

Returns a SQL time parsed from *string* in form of "HH:mm:ss".

TIMESTAMP string

Returns a SQL timestamp parsed from *string* in form of "-MM-dd
HH:mm:ss[.SSS]".

INTERVAL string range

Parses an interval *string* in the form "dd hh:mm:ss.fff" for SQL intervals
of milliseconds or "-mm" for SQL intervals of months. An interval range
might be DAY, MINUTE, DAY TO HOUR, or DAY TO SECOND for intervals of
milliseconds; YEAR or YEAR TO MONTH for intervals of months.

E.g., INTERVAL '10 00:00:00.004' DAY TO SECOND, INTERVAL '10' DAY, or INTERVAL
'2-10' YEAR TO MONTH return intervals.

CURRENT_DATE

Returns the current SQL date in the UTC time zone.

CURRENT_TIME

Returns the current SQL time in the UTC time zone.

CURRENT_TIMESTAMP

Returns the current SQL timestamp in the UTC time zone.

LOCALTIME

Returns the current SQL time in local time zone.

LOCALTIMESTAMP

Returns the current SQL timestamp in local time zone.


flink1.10 sink结果到mysql的问题

2020-08-28 文章 赵子仪
Hi:
  遇到的问题如下:
  
1.flink版本1.10.1,消费kafka数据后进行业务逻辑计算,并将结果sink到mysql,本地window开发环境可以正常消费kafka和sink结果到mysql,打包提交到集群,以Yarn的Per-job模式跑任务,无法写入结果到mysql
  
2.还是以上的环境,sql-client下提交任务,手动开启kafka生产者往对应主题写入数据,可以消费kafka数据,正常断开kafka生产者客户端后再重新开启,flink程序可以正常消费kafka数据;若kafka生产者客户端因某些异常超时断开连接,再次重启kafka生产者,flink程序不会去消费kafka生产者中的新的数据


多谢大家了
| |
赵子仪
|
|
zyzhao...@163.com
|
签名由网易邮箱大师定制

答复: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 范超
确实可以通过counter来实现[1]
相应的blog在[2]

[1] https://github.com/mbode/flink-prometheus-example
[2]https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html


-邮件原件-
发件人: Yun Tang [mailto:myas...@live.com] 
发送时间: 2020年8月28日 星期五 14:37
收件人: user-zh@flink.apache.org
主题: Re: flink prometheus 无法上报accumulator类型监控吗

Hi

没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters [1] 
这四种,如果你想要用累积型metrics,可以考虑counters

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types

祝好
唐云


From: 赵一旦 
Sent: Friday, August 28, 2020 10:53
To: user-zh@flink.apache.org 
Subject: Re: flink prometheus 无法上报accumulator类型监控吗

hi,有人回答下这个问题吗。

赵一旦  于2020年8月21日周五 下午4:20写道:

> 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
>


Re: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 赵一旦
现在问题是我业务代码中添加的统计指标,部分被prometheus拿到了,部分拿不到。

当前比如有个 latenessInMillisGauge 是可以拿到的,在flink中用guage实现的。
但是我用的Counter类的都没被监测到。
比如:timeAheadDroppedCounter = getRuntimeContext().getLongCounter(
"timeAheadDroppedCounter");
这种就没被找到。

Xiao Xu  于2020年8月28日周五 下午2:40写道:

> accumulator 是聚合后的指标, metics 里是底层的指标, 据我所知没有办法打到监控里面
>
> Yun Tang  于2020年8月28日周五 下午2:37写道:
>
> > Hi
> >
> > 没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters
> > [1] 这四种,如果你想要用累积型metrics,可以考虑counters
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types
> >
> > 祝好
> > 唐云
> >
> > 
> > From: 赵一旦 
> > Sent: Friday, August 28, 2020 10:53
> > To: user-zh@flink.apache.org 
> > Subject: Re: flink prometheus 无法上报accumulator类型监控吗
> >
> > hi,有人回答下这个问题吗。
> >
> > 赵一旦  于2020年8月21日周五 下午4:20写道:
> >
> > > 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
> > >
> >
>


Re: Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-28 文章 dixingxin...@163.com
@Jark,感谢回复,很好的解答了我的疑惑


Best,
Xingxing Di
 
Sender: Jark Wu
Send Time: 2020-08-27 20:13
Receiver: user-zh
Subject: Re: Re: 请问一下,flink 1.11 的cdc历史数据问题
Hi,
 
debezium 是支持全量加载的。debezium 的一个亮点就是能够加载完存量数据以后能够无缝切换到 binlog 模式。
全量加载可以看下 SnapshotReader。
 
另外,全量数据导入到 kafka ,然后从 kafka 加载全量再切换到 mysql
binlog,这里面主要一个问题是很难做到无缝切换,因为不知道确切的 mysql binlog 位点。
 
Best,
Jark
 
On Tue, 25 Aug 2020 at 12:47, dixingxin...@163.com 
wrote:
 
> Hi:
> Leonard Xu 感谢回复
> > 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> > 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
>
> 这里恰好是我的疑问,之前看debezium代码,没有找到使用jdbc加载全量历史数据的代码,debezium的snapshot看起来只是保存了表的schema变更记录,这样重新消费binlog时,可以为每条binlog数据找到正确schema,达到正确解析历史数据的目的。
>
> 我的疑问是,如果加载全量历史数据,只是指定binlog的offset,从头读取binlog,那么是不是有可能无法加载到全量的数据,因为通常binlog是有过期时间的,不会保存全量的binlog。如果我理解的没问题,那么目前flink1.11
> 的cdc是无法加载全量历史数据的。
>
> 我理解加载全量数据,无非两种方式:
> 1.jdbc从源表全量拉取数据
> 2.将原表数据初始化到一个kafka
> topic中(topic设置为compact模式),再消费binlog,往这个topic里写入增量数据,确保这个topic的数据和原表一致,然后flink作业启动时,从这个topic的earliest
> offset消费,得到全量的历史数据。
>
> 不知道我的理解是否正确,希望能帮忙解答
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Leonard Xu
> 发送时间: 2020-08-25 10:03
> 收件人: user-zh
> 主题: Re: 请问一下,flink 1.11 的cdc历史数据问题
> Hello
>
> > Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
> > 1.底层是使用了debezium来加载历史数据的吗?
> Flink支持两种CDC格式,debezium json和 canal json, debezium 和
> canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka,
> 作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。
> 另外Jark 在Veverica 开源了一个Flink CDC connector
> [1],支持利用debezuim直接读取数据库的cdc数据,不需要搭建CDC系统。
>
> > 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
>
>
> Best
> Leonard
> [1] https://github.com/ververica/flink-cdc-connectors <
> https://github.com/ververica/flink-cdc-connectors>
>


flink1.11连接mysql问题

2020-08-28 文章 酷酷的浑蛋
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.




flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用



flink1.11时间函数

2020-08-28 文章 Dream-底限
hi

UNIX_TIMESTAMP()

NOW()

我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗


Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-28 文章 Leonard Xu
Hi

> 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown.
我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下
第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc Rui Li 
确认下 

祝好
Leonard
[1] https://issues.apache.org/jira/projects/FLINK/summary 

> 
> 关于"select * from hive_hbase_t1"的异常日志如下。
> 
> 
> Flink SQL> select * from hive_hbase_t1;
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
> 
> [] - HiveConf of name hive.vectorized.use.checked.expressions does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
> 
> [] - HiveConf of name hive.strict.checks.no.partition.filter does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
> 
> [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
> 2020-08-28 13:20:19,985 WARN  org.apache.hadoop.hive.conf.HiveConf
> 
> [] - HiveConf of name hive.vectorized.input.format.excludes does not exist
> 2020-08-28 13:20:19,986 WARN  org.apache.hadoop.hive.conf.HiveConf
> 
> [] - HiveConf of name hive.strict.checks.bucketing does not exist
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>   at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
>   at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>   at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>   at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>   at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
>   at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>   ... 6 more
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits
> caused an error: Unable to instantiate the hadoop input format
>   at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init>(ExecutionJobVertex.java:272)
>   at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
>   at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
>   at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>   at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>   at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
>   at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>   at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
>   at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
>   at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>   at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
>   ... 7 more
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate the hadoop input format
>   at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:307)
>   at
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:282)
>   at
> 

Re: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 Xiao Xu
accumulator 是聚合后的指标, metics 里是底层的指标, 据我所知没有办法打到监控里面

Yun Tang  于2020年8月28日周五 下午2:37写道:

> Hi
>
> 没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters
> [1] 这四种,如果你想要用累积型metrics,可以考虑counters
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types
>
> 祝好
> 唐云
>
> 
> From: 赵一旦 
> Sent: Friday, August 28, 2020 10:53
> To: user-zh@flink.apache.org 
> Subject: Re: flink prometheus 无法上报accumulator类型监控吗
>
> hi,有人回答下这个问题吗。
>
> 赵一旦  于2020年8月21日周五 下午4:20写道:
>
> > 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
> >
>


Re: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 Yun Tang
Hi

没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters [1] 
这四种,如果你想要用累积型metrics,可以考虑counters

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types

祝好
唐云


From: 赵一旦 
Sent: Friday, August 28, 2020 10:53
To: user-zh@flink.apache.org 
Subject: Re: flink prometheus 无法上报accumulator类型监控吗

hi,有人回答下这个问题吗。

赵一旦  于2020年8月21日周五 下午4:20写道:

> 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
>


答复: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-28 文章 范超
谢谢云哥,可以了! 解决了我的大问题。


-邮件原件-
发件人: Yun Tang [mailto:myas...@live.com] 
发送时间: 2020年8月28日 星期五 13:58
收件人: user-zh 
主题: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

Hi 范超

虽然看不到你的图,但是你的启动命令错误了,所有的options应该放在jar包文件地址前面[1]

  1.  class name 应该在 jar包地址前面 [2]
  2.  savepoint/checkpoint 地址应该在jar包地址前面 [3]

没有正确从checkpoint恢复的原因应该是这个原因

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

祝好
唐云


From: zilong xiao 
Sent: Friday, August 28, 2020 11:45
To: user-zh 
Subject: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

图挂了,用图床工具贴链接吧

范超  于2020年8月28日周五 上午11:37写道:

> Hi, 大家好
>
> Flink版本 1.10.0
>
>
>
> 目前程序的checkpoint使用rocksdb的方式存储在hdfs上,在sink失败的时候能够正常从上一个checkpoint点恢复。
>
> 问题是由于升级程序,我使用了命令行
>
> *bin/flink stop -p ${hdfsSavepointDir} -d $runningJobId -yid 
> $yarnAppId*
>
>
>
> 将savepoint文件保存,然后再使用保存的savepoint来启动程序
>
> */bin/flink run -d -m yarn-cluster -p ${parallelism} -yjm ${jm} -ytm 
> ${tm} $fullJarPath -s $savePointFullPath �Cc xxx*
>
>
>
> 比较无法理解的是,jm和tm日志都显示成功启动,但是无法看到从checkpoint恢复的记录如下图所示:
>
>
>
> 有知道的大佬知道是不是我哪里处理不正常么?
>


Re: tidb Binlog 整库同步到 hive

2020-08-28 文章 Rui Li
是的,我觉得需要自己实现一个sink,检测到新增的表时需要通过catalog去hive里创建一下。有点像hive的dynamic
partitioning,只不过写的是多张表而不是多个partition。

On Fri, Aug 28, 2020 at 2:08 PM Qishang  wrote:

> Hi Rui Li.
>
> > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了
> 这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配?
>
> Rui Li  于2020年8月28日周五 下午1:47写道:
>
> > Hi,
> >
> > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive
> > connector里现有的sink。
> >
> > On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu  wrote:
> >
> > > Hi
> > >
> > > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog
> > > create
> > > > table 是否可以在运行中来调用吗?
> > > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。
> > >
> > > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream
> 还是
> > > SQL 的拓扑。
> > >
> > > 祝好
> > > Leonard
> > >
> > >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: tidb Binlog 整库同步到 hive

2020-08-28 文章 Qishang
Hi Rui Li.

> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了
这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配?

Rui Li  于2020年8月28日周五 下午1:47写道:

> Hi,
>
> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive
> connector里现有的sink。
>
> On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu  wrote:
>
> > Hi
> >
> > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog
> > create
> > > table 是否可以在运行中来调用吗?
> > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。
> >
> > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream 还是
> > SQL 的拓扑。
> >
> > 祝好
> > Leonard
> >
> >
>
> --
> Best regards!
> Rui Li
>


答复: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-28 文章 范超
Hi 唐云哥,收到,我现在测试一下看看
感谢感谢


-邮件原件-
发件人: Yun Tang [mailto:myas...@live.com] 
发送时间: 2020年8月28日 星期五 13:58
收件人: user-zh 
主题: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

Hi 范超

虽然看不到你的图,但是你的启动命令错误了,所有的options应该放在jar包文件地址前面[1]

  1.  class name 应该在 jar包地址前面 [2]
  2.  savepoint/checkpoint 地址应该在jar包地址前面 [3]

没有正确从checkpoint恢复的原因应该是这个原因

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

祝好
唐云


From: zilong xiao 
Sent: Friday, August 28, 2020 11:45
To: user-zh 
Subject: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

图挂了,用图床工具贴链接吧

范超  于2020年8月28日周五 上午11:37写道:

> Hi, 大家好
>
> Flink版本 1.10.0
>
>
>
> 目前程序的checkpoint使用rocksdb的方式存储在hdfs上,在sink失败的时候能够正常从上一个checkpoint点恢复。
>
> 问题是由于升级程序,我使用了命令行
>
> *bin/flink stop -p ${hdfsSavepointDir} -d $runningJobId -yid 
> $yarnAppId*
>
>
>
> 将savepoint文件保存,然后再使用保存的savepoint来启动程序
>
> */bin/flink run -d -m yarn-cluster -p ${parallelism} -yjm ${jm} -ytm 
> ${tm} $fullJarPath -s $savePointFullPath �Cc xxx*
>
>
>
> 比较无法理解的是,jm和tm日志都显示成功启动,但是无法看到从checkpoint恢复的记录如下图所示:
>
>
>
> 有知道的大佬知道是不是我哪里处理不正常么?
>