Re: 回复:退订

2024-03-31 文章 Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 戴少 
Sent: Monday, April 1, 2024 11:10
To: user-zh 
Cc: user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com 
;
 user-zh-subscribe ; user-zh 

Subject: 回复:退订

退订

--

Best Regards,




 回复的原邮件 
| 发件人 | 李一飞 |
| 发送日期 | 2024年03月14日 00:09 |
| 收件人 | 
user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com,
user-zh-subscribe ,
user-zh  |
| 主题 | 退订 |
退订




Re: 退订

2024-03-31 文章 Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: zjw 
Sent: Monday, April 1, 2024 11:05
To: user-zh@flink.apache.org 
Subject: 退订




Re: Re:Re: Re: 1.19自定义数据源

2024-03-31 文章 Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 熊柱 <18428358...@163.com>
Sent: Monday, April 1, 2024 11:14
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: 1.19自定义数据源

退订

















在 2024-03-28 19:56:06,"Zhanghao Chen"  写道:
>如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/
>
>Best,
>Zhanghao Chen
>
>From: ha.fen...@aisino.com 
>Sent: Thursday, March 28, 2024 15:34
>To: user-zh 
>Subject: Re: Re: 1.19自定义数据源
>
>我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
>
>发件人: gongzhongqiang
>发送时间: 2024-03-28 15:05
>收件人: user-zh
>主题: Re: 1.19自定义数据源
>你好:
>
>当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
>版本考虑,可以将这些SourceFunction用Source重新实现。
>
>ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:
>
>>
>> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>>


Re: Re: 1.19自定义数据源

2024-03-28 文章 Zhanghao Chen
如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/

Best,
Zhanghao Chen

From: ha.fen...@aisino.com 
Sent: Thursday, March 28, 2024 15:34
To: user-zh 
Subject: Re: Re: 1.19自定义数据源

我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?

发件人: gongzhongqiang
发送时间: 2024-03-28 15:05
收件人: user-zh
主题: Re: 1.19自定义数据源
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


Re: flink写kafka时,并行度和分区数的设置问题

2024-03-13 文章 Zhanghao Chen
你好,

写 Kafka 分区的策略取决于使用的 Kafka Sink 的 Partitioner [1],默认使用的是 Kafka 的 Default 
Partitioner,底层使用了一种称之为黏性分区的策略:对于指定 key 的数据按照对 key hash 的方式选择分区写入,对于未指定 key 
的数据则随机选择一个分区,然后“黏住”这个分区一段时间以提升攒批效果,然后攒批结束写完后再随机换一个分区,来在攒批效果和均匀写入间做一个平衡。
具体可以参考 [2]。

因此,默认配置下不存在你说的遍历导致攒批效果下降的问题,在达到 Kafka 
单分区写入瓶颈前,只是扩大写入并发就会有比较好的提升写入吞吐的效果。不过在一些特殊情况下,比如如果你并发很高,单并发写入 QPS 
极低,以致于单次攒批周期内甚至只有一两条消息,导致攒批效果差,打到 Kafka 写入瓶颈,那么降低并发可能反而能通过提升攒批效果的形式,配合写入压缩降低写入 
Kafka 流量,提升写入吞吐。

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#sink-partitioning
[2] https://www.cnblogs.com/huxi2b/p/12540092.html



From: chenyu_opensource 
Sent: Wednesday, March 13, 2024 15:25
To: user-zh@flink.apache.org 
Subject: flink写kafka时,并行度和分区数的设置问题

您好:
 flink将数据写入kafka【kafka为sink】,当kafka 
topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?

 是否有相关的源码可以查看。
期待回复,祝好,谢谢!





Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
事实上是可行的。你可以直接修改 StreamExecutionEnvironment 的源码,默认给作业作业注册上一个你们定制的 
listener,然后通过某种那个方式把这个信息透出来。在 FLIP-314 [1] 中,我们计划直接在 Flink 里原生提供一个这样的接口让你去注册自己的 
listener 获取血缘信息,不过还没发布,可以先自己做。

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314:+Support+Customized+Job+Lineage+Listener

From: 阿华田 
Sent: Friday, March 8, 2024 18:47
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-03 文章 Zhanghao Chen
我在 Yanquan 的回答基础上补充下,通过 /jobs/:jobid/plan 实际上拿到的就是 JSON 表示的 JobGraph 信息(通过 
JsonPlanGenerator 这个类生成,包含了绝大部分 jobgraph 里常用的信息),应该能满足你的需要

From: casel.chen 
Sent: Saturday, March 2, 2024 14:17
To: user-zh@flink.apache.org 
Subject: 根据flink job web url可以获取到JobGraph信息么?

正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


Re: 退订

2024-02-21 文章 Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅来自 
user-zh@flink.apache.org
邮件组的邮件。

Best,
Zhanghao Chen

From: 曹明勤 
Sent: Thursday, February 22, 2024 9:42
To: user-zh@flink.apache.org 
Subject: 退订

退订


Re: 退订

2023-11-25 文章 Zhanghao Chen
你好,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best,
Zhanghao Chen

From: 唐凯 
Sent: Saturday, November 25, 2023 9:23
To: user-zh 
Subject: 退订

退订






Re: Flink应用高可靠

2022-07-25 文章 Zhanghao Chen
冷备部署的话可以通过一个外围的作业管控服务定期做 savepoint 并拷贝到另一条链路的 HDFS 集群上,故障时从另一条链路重启作业即可。

Best,
Zhanghao Chen

From: andrew <15021959...@163.com>
Sent: Monday, July 25, 2022 10:05:39 PM
To: user-zh 
Subject: Flink应用高可靠

Dear Flink:
  你好! 
现有一个需求,Flink实时计算平台任务对下游用户很重要,不能出问题。单位准备搭建一套灾备大数据实时集群(kakfa/yarn/hdfs)去部署相同的Flink任务,做应用热备或冷备部署!
 下游业务系统没有做双活热备部署! 疑问是:
   1.  主集群故障,切换灾备集群
  涉及有大量带中间状态的数据实时应用一旦主集群出问题,灾备集群如何同步最新状态的数据进行计算
   2.  主集群若恢复,灾备集群切换后的正常任务如何做数据回迁处理


针对上述需求,社区有没有案例可以提供测试验证!谢谢


Re: flink异常

2022-07-24 文章 Zhanghao Chen
你好,可以检查下:

  1.  tm 侧是否有异常,导致 tm 退出;
  2.  tm 侧是否 gc 严重导致没有及时处理心跳;
  3.  jm - tm 间是否网络有异常导致心跳信息无法传达。

Best,
Zhanghao Chen

From: 陈卓宇 <2572805...@qq.com.INVALID>
Sent: Friday, July 22, 2022 11:30
To: user-zh 
Subject: flink异常

社区大佬您好,小弟请教一个问题:
flink版本:1.14.5
异常内容如下:
2022-07-22 10:07:51
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我该如何解决,如何优化


Re: Re:Re: Flink SQL 能处理延时丢弃的数据吗?

2022-07-16 文章 Zhanghao Chen
Hi, 退订请发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org

Best,
Zhanghao Chen

From: 孙福 
Sent: Saturday, July 16, 2022 23:01
To: user-zh@flink.apache.org 
Subject: Re:Re: Flink SQL 能处理延时丢弃的数据吗?

退订

















在 2022-07-15 15:06:51,"Zhizhao Shangguan"  写道:
>  谢谢 Zhanghao。上午已经测试过了,简单功能是可以,后面会进一步验证这个试验性功能。
>
>在 2022/7/15 下午12:26,“Zhanghao 
>Chen”zhanghao.c...@outlook.com> 写入:
>
>
> 你好,可以看下:https://www.mail-archive.com/issues@flink.apache.org/msg498605.html 
> 的总结。不过这个特性还是实验性的,请谨慎使用。
>
>Best,
>Zhanghao Chen
>
>From: Zhizhao Shangguan 
>Sent: Friday, July 15, 2022 10:44
>To: user-zh@flink.apache.org 
>Subject: Flink SQL 能处理延时丢弃的数据吗?
>
>Hi,ALL:
>
>
>
>   咨询个问题,对于超过watermark后的数据,还想继续触发窗口操作(类似API的allowedLateness机制),flink 
> SQL可以实现吗? 可以话,要如何处理的?
>
>
>
>Thanks♪(・ω・)ノ
>
>


Re: Flink SQL 能处理延时丢弃的数据吗?

2022-07-14 文章 Zhanghao Chen
你好,可以看下:https://www.mail-archive.com/issues@flink.apache.org/msg498605.html 
的总结。不过这个特性还是实验性的,请谨慎使用。

Best,
Zhanghao Chen

From: Zhizhao Shangguan 
Sent: Friday, July 15, 2022 10:44
To: user-zh@flink.apache.org 
Subject: Flink SQL 能处理延时丢弃的数据吗?

Hi,ALL:



   咨询个问题,对于超过watermark后的数据,还想继续触发窗口操作(类似API的allowedLateness机制),flink 
SQL可以实现吗? 可以话,要如何处理的?



Thanks♪(・ω・)ノ



Re: 任务出现TooLongFrameException: Adjusted frame length exceeds问题

2022-07-03 文章 Zhanghao Chen
你好,能提供下完整的 jm/tm 侧报错日志吗?

Best,
Zhanghao Chen

From: 谭家良 
Sent: Sunday, July 3, 2022 16:18
To: user-zh@flink.apache.org 
Subject: 任务出现TooLongFrameException: Adjusted frame length exceeds问题

版本:flink-1.14
任务:FlinkSQL,从kafka消费,TVF统计窗口输出
出现问题:无论是Flink JM和TM都出现了TooLongFrameException: Adjusted frame length exceeds 
10485760: 369295622 - discarded
[cid:4dfb897b$1$181c32399ec$Coremail$tanjl_work$126.com]

我看有部分任务会经常出现这种问题,想请教一下几个问题
1. 出现这种问题的原因是什么?是JM与TM的通信问题还是TM与TM之间的通信问题?
2. 这种情况的出现是否会带来数据丢失?
3. 无论是jm和tm都会出现这种情况,有什么好的解决办法?
<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E8%B0%AD%E5%AE%B6%E8%89%AF=tanjl_work%40126.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsm266e1d2e26af9f759edc476d7a2a0758.jpg=%5B%22tanjl_work%40126.com%22%5D>
[https://mail-online.nosdn.127.net/sm266e1d2e26af9f759edc476d7a2a0758.jpg]
谭家良
tanjl_w...@126.com


Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 文章 Zhanghao Chen
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 
HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。

2.基于k8s做HA的Flink job id皆为。

开启 HA 的 Application mode 的 Flink job id 
皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 
服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 
生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id 
(一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 
相关联,导致作业从全新状态恢复。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。

可以看下官方的博客文章: 
https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 
设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink


Best,
Zhanghao Chen

From: m18814122325 
Sent: Sunday, June 12, 2022 22:45
To: user-zh@flink.apache.org 
Subject: Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO

Re: unsubscribe; 退订

2022-06-12 文章 Zhanghao Chen
你好, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org。

Best,

Zhanghao Chen

From: chenshu...@foxmail.com 
Sent: Sunday, June 12, 2022 11:44
To: user-zh 
Subject: unsubscribe; 退订

unsubscribe
退订



chenshu...@foxmail.com


Re: Unaligned Checkpoint

2022-06-12 文章 Zhanghao Chen
你好,

Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数 
execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" = 
"value" 的语法设置 Flink 参数的值。

Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于

  *   unaligned cp 在输入缓冲区收到第一个 cp barrier 
的时候立即触发快照并直接输出至下游;代价是快照需要记录缓冲区中的数据来保证一致性,产生更多 io 并增大 cp 大小。
  *   aligned cp 在算子收到最后一个 cp barrier 完成 barrier 对齐后才触发快照,barrier 对齐期间较早收到 
barrier 的 input channel 会被阻塞,在反压时阻塞时间会显著增加,导致 cp 速度变慢;好处是 barrier 
对齐的过程使得快照不需要记录缓冲等待队列中的数据就可以保证一致性。

Best,
Zhanghao Chen

From: 小昌同学 
Sent: Saturday, June 11, 2022 17:18
To: user-zh@flink.apache.org 
Subject: Unaligned Checkpoint

大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: Re: Re: 日志里提示 The state is cleared because of state ttl. This will result in incorrect result 如何解决?

2022-04-20 文章 Zhanghao Chen
你是用的什么 Flink 集群部署模式提交的 SQL 任务呢?

Best,
Zhanghao Chen

From: 段晓雄 
Sent: Saturday, April 16, 2022 19:52
To: user-zh@flink.apache.org 
Subject: RE: Re: Re: 日志里提示 The state is cleared because of state ttl. This will 
result in incorrect result 如何解决?

Zhanghao,

感谢帮助!我在 sql-client.sh 里设置了 table.exec.state.ttl= 
12960,但现在我不知道如何确认是否设置成功,我从web UI 和 rest api 返回的任务状态和checkpoint状态中都没有找到 
state ttl 的值,我发现任务状态还是不断增大,我如何能确认任务 state ttl 的情况呢?


On 2022/04/12 04:15:37 Zhanghao Chen wrote:
> 你可以用 SQL client SET 'table.exec.state.ttl' = 'xxx'; 的语法来实现哈,具体可以参考下:
>
>
>   1.  
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries
> SQL Client | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries>
> The SET command allows you to tune the job execution and the sql client 
> behaviour. See SQL Client Configuration below for more details.. After a 
> query is defined, it can be submitted to the cluster as a long-running, 
> detached Flink job. The configuration section explains how to declare table 
> sources for reading data, how to declare table sinks for writing data, and 
> how to configure other table ...
> nightlies.apache.org
>   2.
>   3.  
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl
> Configuration | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl>
> Configuration # By default, the Table & SQL API is preconfigured for 
> producing accurate results with acceptable performance. Depending on the 
> requirements of a table program, it might be necessary to adjust certain 
> parameters for optimization. For example, unbounded streaming programs may 
> need to ensure that the required state size is capped (see streaming 
> concepts).
> nightlies.apache.org
>
>
> Best,
> Zhanghao Chen
> 
> From: 段晓雄 
> Sent: Monday, April 11, 2022 20:23
> To: user-zh@flink.apache.org 
> Subject: RE: Re: 日志里提示 The state is cleared because of state ttl. This will 
> result in incorrect result 如何解决?
>
> 我使用sql-client.sh 执行sql创建的任务,如何设置 ttl 的时长?
>
> On 2022/04/11 11:14:36 yidan zhao wrote:
> > You can increase the state ttl to avoid this.
> > 这个已经明确了方法了,增加ttl的时长。
> >
> > 段晓雄  于2022年4月11日周一 09:52写道:
> > >
> > > 各位老大,
> > >
> > > 现在是 Flink 1.14.4 集群,通过 pyflink执行 sql 做流处理,
> > >
> > > taskmanager日志大量 The state is cleared because of state ttl. This will 
> > > result in incorrect result. You can increase the state ttl to avoid this. 
> > > 为什么出现?如何解决?
> > >
> > >
> > > 2022-04-09 17:08:54,672 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 284
> > > 2022-04-09 17:08:54,672 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 284
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,922 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 277
> > > 2022-04-09 17:08:54,922 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 277
> > > 2022-04-09 17:08:54,952 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,952 INFO  
&g

Re: flink命令行参数不生效问题

2022-04-11 文章 Zhanghao Chen
我知道的比较相关的 FLIP 和 JIRA 有下面两个:

1. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
FLIP-73: Introducing Executors for job submission - Apache Flink - The Apache 
Software 
Foundation<https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission>
Status. Current state: Accepted. Discussion thread: 
https://lists.apache.org/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache
 ...
cwiki.apache.org

2. https://issues.apache.org/jira/browse/FLINK-15179 特别是这个 JIRA 的评论区可以看一下


Best,
Zhanghao Chen

From: QQ <1139872...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 9:19
To: user-zh@flink.apache.org 
Subject: Re: flink命令行参数不生效问题

非常感谢解答。关于这套新的统一的cli命令的说明或者FLIP在哪里?

> 2022年4月11日 下午11:49,Zhanghao Chen  写道:
>
> 你好,-m 配合 -yxx 的参数是早期 Flink on YARN 的 cli 参数用法,后来社区开始推进一套新的统一的 cli 命令,使用 -t 
> 指定部署形式,并将原先的 cli options 统一动态参数化,比如原先的 -yxx 命令都能从 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn
>  找到替代的动态参数。
> Configuration | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn>
> Key Default Type Description; restart-strategy.fixed-delay.attempts: 1: 
> Integer: The number of times that Flink retries the execution before the job 
> is declared as failed if restart-strategy has been set to fixed-delay.: 
> restart-strategy.fixed-delay.delay
> nightlies.apache.org
>
>
> Best,
> Zhanghao Chen
> 
> From: gangzi <1139872...@qq.com.INVALID>
> Sent: Monday, April 11, 2022 19:55
> To: user-zh 
> Subject: flink命令行参数不生效问题
>
> 我用命令提交作业:flink run -t yarn-per-job -ynm SocketWordCount -yqu root.root 
> -d -n SocketWindowWordCount.jar --hostname 10.199.0.97 --port 
> 9878。结果作业提交成功之后发现 -ynm和-yqu不生效。后来通过查看源码发现是因为如果指定了 
> -t,那么-y开头的所有参数都不生效了,因为-y系列参数是在FlinkYarnSessionCli中解析的,而源码中:public 
> CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
>   LOG.debug("Custom commandlines: {}", customCommandLines);
>   for (CustomCommandLine cli : customCommandLines) {
> LOG.debug(
> "Checking custom 
> commandline {}, isActive: {}", cli, cli.isActive(commandLine));
> if (cli.isActive(commandLine)) {
>   return cli;
> }
>   }
>   throw new IllegalStateException("No valid command-line found.");
> }
> 这段代码返回的是GenericCLI。导致后面的:
> final Configuration effectiveConfiguration =
> getEffectiveConfiguration(activeCommandLine, 
> commandLine, programOptions, jobJars);
> 这行代码返回的命令行参数配置只包含了GenericCli中定义的参数。想请教一下,-t和-m设置参数时有啥区别?如何解决上述参数不生效的问题?这是一个bug么?



Re: Re: 日志里提示 The state is cleared because of state ttl. This will result in incorrect result 如何解决?

2022-04-11 文章 Zhanghao Chen
你可以用 SQL client SET 'table.exec.state.ttl' = 'xxx'; 的语法来实现哈,具体可以参考下:


  1.  
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries
SQL Client | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries>
The SET command allows you to tune the job execution and the sql client 
behaviour. See SQL Client Configuration below for more details.. After a query 
is defined, it can be submitted to the cluster as a long-running, detached 
Flink job. The configuration section explains how to declare table sources for 
reading data, how to declare table sinks for writing data, and how to configure 
other table ...
nightlies.apache.org
  2.
  3.  
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl
Configuration | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl>
Configuration # By default, the Table & SQL API is preconfigured for producing 
accurate results with acceptable performance. Depending on the requirements of 
a table program, it might be necessary to adjust certain parameters for 
optimization. For example, unbounded streaming programs may need to ensure that 
the required state size is capped (see streaming concepts).
nightlies.apache.org


Best,
Zhanghao Chen

From: 段晓雄 
Sent: Monday, April 11, 2022 20:23
To: user-zh@flink.apache.org 
Subject: RE: Re: 日志里提示 The state is cleared because of state ttl. This will 
result in incorrect result 如何解决?

我使用sql-client.sh 执行sql创建的任务,如何设置 ttl 的时长?

On 2022/04/11 11:14:36 yidan zhao wrote:
> You can increase the state ttl to avoid this.
> 这个已经明确了方法了,增加ttl的时长。
>
> 段晓雄  于2022年4月11日周一 09:52写道:
> >
> > 各位老大,
> >
> > 现在是 Flink 1.14.4 集群,通过 pyflink执行 sql 做流处理,
> >
> > taskmanager日志大量 The state is cleared because of state ttl. This will result 
> > in incorrect result. You can increase the state ttl to avoid this. 
> > 为什么出现?如何解决?
> >
> >
> > 2022-04-09 17:08:54,672 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 284
> > 2022-04-09 17:08:54,672 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 284
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,922 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 277
> > 2022-04-09 17:08:54,922 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 277
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because 

Re: flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 Zhanghao Chen
你好,可以贴下客户端的具体提交命令吗?

Best,
Zhanghao Chen

From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh 
Subject: flinksql执行时提示自定义UDF无法加载的

环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf 
 jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as 
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id 
as id, user_id as user_id, status as status from (select id,user_id,status from 
data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", 
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
 * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
 *
 * @param fsUrlStreamHandlerFactory
 * @throws Exception
 */
public static void registerFactory(final FsUrlStreamHandlerFactory 
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " + 
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory = 
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return 
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
return 
o

Re: flink命令行参数不生效问题

2022-04-11 文章 Zhanghao Chen
你好,-m 配合 -yxx 的参数是早期 Flink on YARN 的 cli 参数用法,后来社区开始推进一套新的统一的 cli 命令,使用 -t 
指定部署形式,并将原先的 cli options 统一动态参数化,比如原先的 -yxx 命令都能从 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn
 找到替代的动态参数。
Configuration | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn>
Key Default Type Description; restart-strategy.fixed-delay.attempts: 1: 
Integer: The number of times that Flink retries the execution before the job is 
declared as failed if restart-strategy has been set to fixed-delay.: 
restart-strategy.fixed-delay.delay
nightlies.apache.org


Best,
Zhanghao Chen

From: gangzi <1139872...@qq.com.INVALID>
Sent: Monday, April 11, 2022 19:55
To: user-zh 
Subject: flink命令行参数不生效问题

我用命令提交作业:flink run -t yarn-per-job -ynm SocketWordCount -yqu root.root -d 
-n SocketWindowWordCount.jar --hostname 10.199.0.97 --port 9878。结果作业提交成功之后发现 
-ynm和-yqu不生效。后来通过查看源码发现是因为如果指定了 
-t,那么-y开头的所有参数都不生效了,因为-y系列参数是在FlinkYarnSessionCli中解析的,而源码中:public 
CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
  LOG.debug("Custom commandlines: {}", customCommandLines);
  for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom 
commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
  return cli;
}
  }
  throw new IllegalStateException("No valid command-line found.");
}
这段代码返回的是GenericCLI。导致后面的:
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, 
commandLine, programOptions, jobJars);
这行代码返回的命令行参数配置只包含了GenericCli中定义的参数。想请教一下,-t和-m设置参数时有啥区别?如何解决上述参数不生效的问题?这是一个bug么?


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-08 文章 Zhanghao Chen
Standalone K8s 和 Native K8s 部署模式主要的区别是 Native K8s 模式下的 Flink 具备和 K8s API Server 
直接通信来申请所需的资源和感知集群状态的能力,而 Standalone K8s 对底层的 K8s 集群没有直接感知,这带来了两个主要区别:


  1.  在部署上,Standalone K8s 需要你手动去创建集群所需要的 deployment、configmap、service,而 Native 
K8s 你只需要调用 Flink CLI 就行。
  2.  在资源申请上,Standalone K8s 使用被动资源管理 - 需要你或者其他外部系统分配好资源,Flink 
被动接受这些分配好的资源;Native K8s 使用主动资源管理 - Flink 集群启动后自己会根据提交上来的作业的属性去跟 K8s 申请所需要的资源。

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Friday, April 8, 2022 10:52
To: user-zh 
Subject: Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

貌似官网对flink k8s情况有2个入口,分别为:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode
和
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。

分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource
Providers/Native
Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native
kubernetes”这种区分呢?

>
> 集群是3台物理机搭建,非minikube。
> 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
> 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。
>
> yidan zhao  于2022年4月8日周五 10:38写道:
> >
> > 如下是 describe svc my-first-flink-cluster-rest 的结果:
> > Name: my-first-flink-cluster-rest
> > Namespace:default
> > Labels:   app=my-first-flink-cluster
> >   type=flink-native-kubernetes
> > Annotations:  
> > Selector:
> > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> > Type: LoadBalancer
> > IP Family Policy: SingleStack
> > IP Families:  IPv4
> > IP:   192.168.127.57
> > IPs:  192.168.127.57
> > Port: rest  8081/TCP
> > TargetPort:   8081/TCP
> > NodePort: rest  31419/TCP
> > Endpoints:192.168.130.11:8081
> > Session Affinity: None
> > External Traffic Policy:  Cluster
> > Events:   
> >
> > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
> >
> > yu'an huang  于2022年4月8日周五 02:11写道:
> > >
> > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> > >
> > > 方便的话可以分享你运行这个命令的结果吗?
> > > 》kubectl describe svc  my-first-flink-cluster-rest
> > >
> > >
> > >
> > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > > wrote:
> > > >
> > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: yidan zhao 
> > > > Sent: Thursday, April 7, 2022 11:41
> > > > To: user-zh 
> > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > > >
> > > > 参考 
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > > >
> > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > > ./bin/flink run \
> > > >--target kubernetes-session \
> > > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > > >./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > > 的clusterIp:port为192.168.127.57:8081。
> > > > 2 查看任务
> > > > flink list  -m 192.168.127.57:8081
> > > > 3 提交任务
> > > > flink run  -m 192.168.127.57:8081
> > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> > >


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 文章 Zhanghao Chen
你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Thursday, April 7, 2022 11:41
To: user-zh 
Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。

参考 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes

基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
flink session cluster my-first-flink-cluster successfully, JobManager
Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。

并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar

--- 然后如下方式是可以的,不清楚是啥问题呢。
1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
的clusterIp:port为192.168.127.57:8081。
2 查看任务
flink list  -m 192.168.127.57:8081
3 提交任务
flink run  -m 192.168.127.57:8081
/home/work/flink/examples/streaming/TopSpeedWindowing.jar

--- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。


Re: flink 1.15

2022-04-02 文章 Zhanghao Chen
不是的哈。MVP 是 Minimum Viable Product (最简可行产品)的缩写,代表一个只实现了核心功能,听取早期用户反馈来后续进一步完善的版本。

Best,
Zhanghao Chen

From: guanyq 
Sent: Saturday, April 2, 2022 14:56
To: user-zh@flink.apache.org 
Subject: flink 1.15

看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?