Re: flink operator 高可用任务偶发性报错unable to update ConfigMapLock

2024-03-20 文章 Yang Wang
这种一般是因为APIServer那边有问题导致单次的ConfigMap renew lease annotation的操作失败,Flink默认会重试的

如果你发现因为这个SocketTimeoutException原因导致了任务Failover,可以把下面两个参数调大
high-availability.kubernetes.leader-election.lease-duration: 60s
high-availability.kubernetes.leader-election.renew-deadline: 60s


Best,
Yang

On Tue, Mar 12, 2024 at 11:38 AM kellygeorg...@163.com <
kellygeorg...@163.com> wrote:

> 有没有高手指点一二???在线等
>
>
>
>  回复的原邮件 
> | 发件人 | kellygeorg...@163.com |
> | 日期 | 2024年03月11日 20:29 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | flink operator 高可用任务偶发性报错unable to update ConfigMapLock |
> jobmanager的报错如下所示,请问是什么原因?
> Exception occurred while renewing lock:Unable to update ConfigMapLock
>
> Caused by:io.fabric8.kubernetes.client.Kubernetes Client
> Exception:Operation:[replace] for kind:[ConfigMap] with name:[flink task
> xx- configmap] in namespace:[default]
>
>
> Caused by: Java.net.SocketTimeoutException:timeout
>
>
>
>
>
>
>


Re: Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?

2023-04-11 文章 Yang Wang
可以通过JobResultStore[1]来获取任务最终的状态,flink-kubernetes-operator也是这样来获取的

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore

Best,
Yang

Weihua Hu  于2023年3月22日周三 10:27写道:

> Hi
>
> 我们内部最初版本是通过 cluster-id 来唯一标识一个 application,同时认为流式任务是长时间运行的,不应该主动退出。如果该
> cluster-id 在 Kubernetes 中查询不到,说明作业已经异常退出了,此时标记作业为异常。后续我们开发了特殊的 operator +
> crd 来管理 pod 声明周期,防止 pod 快速退出。
>
> 另外,作业状态还可以通过启用 history server[1] 来查看
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/historyserver/
>
> Best,
> Weihua
>
>
> On Wed, Mar 22, 2023 at 9:42 AM Shammon FY  wrote:
>
> > Hi
> >
> > 你可以在你的提交平台启动后台任务定去向k8s查询作业状态,Flink也在设计支持作业状态汇报[1],目前正在讨论中
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Status+Listener
> >
> > Best,
> > Shammon FY
> >
> >
> > On Wed, Mar 22, 2023 at 8:54 AM casel.chen  wrote:
> >
> > >
> >
> Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web
> > > url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?
> >
>


Re: 监控flink的prometheus经常OOM

2023-04-11 文章 Yang Wang
可以通过给Prometheus  server来配置metric_relabel_configs[1]来控制采集哪些metrics

[1].
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs

Best,
Yang

casel.chen  于2023年3月22日周三 13:47写道:

> 更正一下,监控flink的方式从pushgateway方式改成了直接prometheus定期来抓取,周期设置的是1分钟,之前用pushgateway方式也总是把pushgateway打挂,现在改成pull方式还是照样把prometheus打挂。flink作业侧有什么参数可以配置吗?prometheus侧能否配置只抓取那些grafana
> dashboard展示需要的metrics?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 12:08:29,"casel.chen"  写道:
>
> >我们通过pushgateway上报metrics到prometheus,设置的上报周期是30秒,整个实时平台有200多个作业,启了一台50GB的prometheus还是撑不住,指标保留1天,设置了指标在内存中驻留2小时后写入磁盘。最大的一个metric已经有37万条。请问有什么解决办法么?能选择哪些指标进行上报不?
>


Re: flink作业提交运行后如何监听作业状态发生变化?

2022-11-23 文章 Yang Wang
其实可以参考Flink Kubernetes
Operator里面的做法,设置execution.shutdown-on-application-finish参数为false
然后通过轮询Flink RestAPI拿到job的状态,job结束了再主动停掉Application cluster

Best,
Yang

JasonLee <17610775...@163.com> 于2022年11月24日周四 09:59写道:

> Hi
>
>
> 可以通过 Flink 的 Metric 和 Yarn 的 Api 去获取任务的状态(任务提交到 yarn 的话)
>
>
> Best
> JasonLee
>
>
>  回复的原邮件 
> | 发件人 | casel.chen |
> | 发送日期 | 2022年11月23日 08:32 |
> | 收件人 | user-zh@flink.apache.org |
> | 主题 | flink作业提交运行后如何监听作业状态发生变化? |
>
> 请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?


Re: Optimize ApplicationDeployer API design

2022-11-23 文章 Yang Wang
Just Kindly remind, you attached images could not show normally.

Given that *ApplicationDeployer* is not only used for Yarn application
mode, but also native Kubernetes, I am not sure which way you are referring
to return the applicationId.
We already print the applicationId in the client logs. Right?

Best,
Yang

melin li  于2022年11月23日周三 23:46写道:

> The task is submitted by ApplicationDeployer api, and the run is
> synchronous and waiting for the submission to be completed. If the task is
> submitted to yarn, it is probably accepted and the yarn applicationID is
> not obtained at this time. It is difficult to cancel the task.Recommended
> to org. apache. spark.launcher.SparkLauncher design, asynchronous
> submission tasks, can obtain applicationId as soon as possible, if you want
> to delete the task ahead of time, direct yarn application - kill XXX;
>
> [image: image.png]
> [image: image.png]
>


Re: batch job 结束时, flink-k8s-operator crd 状态展示不清晰

2022-10-25 文章 Yang Wang
从1.15开始,任务结束不会主动把JobManager删除掉了。所以Kubernetes Operator就可以正常查到Job状态并且更新

Best,
Yang

¥¥¥  于2022年10月25日周二 15:58写道:

> 退订
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
>
>  发送时间:2022年10月25日(星期二) 下午3:33
> 收件人:"user-zh"
> 主题:batch job 结束时, flink-k8s-operator  crd 状态展示不清晰
>
>
>
> hi,
> 我在使用flink-k8s-operator 部署batch job。 我发现当batch job 结束之后,
> flink-k8s-operator 的 FlinkDeployment CRD 状态发生了变化:
> jobManagerDeploymentStatus 变成了"missing", "error" 变成了“Missing JobManager
> deployment”。 我想这个应该是batch job执行完毕之后,native-k8s 自动将JobmanagerDeployment
> 删除导致的。 请问该如何通过判断CRD的状态来监控batch job的运行结束? jobStatus 中state 如果能够是“finish”
> 那么就很方便判断了。
>
> 
> status:
>  clusterInfo:
>  flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00
>  flink-version: 1.14.6
>  error: Missing JobManager deployment
>  jobManagerDeploymentStatus: MISSING
>  jobStatus:
>  jobId: 3c5807b038300f46154d72c58f074715
>  jobName: batch-job-lab-o8yln9
>  savepointInfo:
>  lastPeriodicSavepointTimestamp: 0
>  savepointHistory: []
>  triggerId: ''
>  triggerTimestamp: 0
>  triggerType: UNKNOWN
>  startTime: '181370751'
>  state: RECONCILING
>  updateTime: '181379021'
>
> 


Re: native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 Yang Wang
你在Flink client端提交任务前设置一下HADOOP_CONF_DIR环境变量

然后再运行flink run-application命令

Best,
Yang

yanfei lei  于2022年9月22日周四 11:04写道:

> Hi Tino,
> 从org.apache.flink.core.fs.FileSystem.java
> <
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L361-L371
> >
>
> 来看,Flink直接将fs.default-scheme当作URI来解析,并没有解析相关xml配置的操作,看起来Flink目前是不支持HA架构的HDFS集群的。
>
> Best,
> Yanfei
>
> Xuyang  于2022年9月21日周三 23:28写道:
>
> > Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
> > Caused by: java.lang.IllegalArgumentException:
> > java.net.UnknownHostException: datacluster
> > 我猜测是不是可以修改为以下两种之一:
> > 1. hdfs://datacluster: port (类似hdfs://datacluster:8080)
> >
> > 2.  hdfs:///datacluster (三个斜杠)
> >
> >
> >
> >
> > 希望可以帮到你
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > 在 2022-09-21 18:24:46,"Tino Hean"  写道:
> > >*大家好, *
> > >*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
> > >./bin/flink run-application \
> > >--detached \
> > >--target kubernetes-application \
> > >-Dkubernetes.cluster-id=test \
> > >-Dkubernetes.container.image=flink-java11 \
> > >-Dfs.default-scheme=hdfs://datacluster \
> > >-Dkubernetes.rest-service.exposed.type=LoadBalancer \
> > >
> >
> >
> >-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> > >\
> > >-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
> > >-Dkubernetes.namespace=flink \
> > >-Dkubernetes.service-account=flink-sa \
> > >-Denv.hadoop.conf.dir=/opt/flink/conf \
> > >-Dkubernetes.container.image.pull-policy=Always \
> > >local:///opt/flink/usrlib/test.jar
> > >
> > >*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
> > >flink@e3187a41a139:~$ ls conf
> > >core-site.xml hdfs-site.xml log4j-console.properties
> > >log4j-session.properties logback-session.xml masters zoo.cfg
> > >flink-conf.yaml log4j-cli.properties log4j.properties
> logback-console.xml
> > >logback.xml workers
> > >
> > >*但是遇到了下面的报错:*
> > >
> > >2022-09-21 10:17:40,156 ERROR
> > >org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could
> > not
> > >start cluster entrypoint KubernetesApplicationClusterEntrypoint.
> > >org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to
> > >initialize the cluster entrypoint
> KubernetesApplicationClusterEntrypoint.
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
> > >[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
> > >[flink-dist-1.15.2.jar:1.15.2]
> > >Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> > >services from the instantiated HighAvailabilityServicesFactory
> >
> >org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> > >at
> >
> >
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at java.security.AccessController.doPrivileged(Native Method)
> > ~[?:?]
> > >at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
> > >at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> >
> >
> >~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
> > >at
> >
> >
> >org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >... 2 more
> > >Caused by: java.io.IOException: Could not create FileSystem for highly
> > >available 

Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 文章 Yang Wang
我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> >-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod status:
> Failed(reason=null, message=null)
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0,
> taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes,
> numSlots=6}, current pending count: 2.
> >
> >2022-08-29 23:49:07,514 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 13 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,514 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. 

Re: flink-k8s-operator中webhook的作用

2022-07-27 文章 Yang Wang
Webhook主要的作用是做CR的校验,避免提交到K8s上之后才发现

例如:parallelism被错误的设置为负值,jarURI没有设置等

Best,
Yang

Kyle Zhang  于2022年7月27日周三 18:59写道:

> Hi,all
> 最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置
> webhook.create=false对整体功能有什么影响?
>
> Best regards
>
> [1]
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.1.0 released

2022-07-24 文章 Yang Wang
Congrats! Thanks Gyula for driving this release, and thanks to all
contributors!


Best,
Yang

Gyula Fóra  于2022年7月25日周一 10:44写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.1.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/news/2022/07/25/release-kubernetes-operator-1.1.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
>
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for the Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351723
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-19 文章 Yang Wang
你的理解是没有问题的

之所以将FlinkSessionJob拆成单独的CR来管理,主要是因为这样也更符合K8s的语义,在Session集群内每个Job也可以作为K8s资源来管理,Job状态变化就能及时更新到Status里面


Best,
Yang

yidan zhao  于2022年7月14日周四 23:01写道:

> 再咨询下关于 flink-k8s-operator 的问题。
> 我看了看问的文档,提供了2个CRD,分别为 FlinkDeployment 和 FlinkSessionJob。不知道如下理解对不对:
> (1)对于 application-mode 方式提交运行的任务,则用 FlinkDeployment,并配置好 job 部分。 会自动创建
> flink 集群,并根据 job 配置运行job。
>  这种方式不需要考虑集群创建、任务提交的步骤,本身就是一体。
> (2)对于 session 集群的创建,也是用 FlinkDeployment ,只是不需要指定 job 配置即可。
> (3)配合通过(2)方式创建的 session 集群,则可以配合 FlinkSessionJob 提交任务。
>
> Yang Wang  于2022年7月12日周二 17:10写道:
> >
> > 如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的
> >
> > 最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]
> >
> > [1]. https://github.com/apache/flink-kubernetes-operator
> >
> > Best,
> > Yang
> >
> > yidan zhao  于2022年7月12日周二 13:17写道:
> >
> > > 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> > > 那么使用 --target kubernetes-session
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> > > cluster 的 svc 的 clusterIp 去提交呢。
> > >
> > > yidan zhao  于2022年7月12日周二 12:50写道:
> > > >
> > > > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> > > >
> > > >
> > > > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> > > >
> > > > yidan zhao  于2022年7月12日周二 12:48写道:
> > > > >
> > > > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > > > >
> > > > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > > > >
> > > > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > > > >
> > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > > > >
> > > > > > > 如下步骤参考的文档
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > >
> > > > > > > 版本:1.15
> > > > > > >
> > > > > > > (1)创建集群:
> > > > > > >
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > > (2)提交任务:
> > > > > > > ./bin/flink run \
> > > > > > > --target kubernetes-session \
> > > > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > > > >
> > > > > > > svc是ClusterIp类型
> > > > > > >
> > > > > > > 第二步提交任务环节,显示如下:
> > > > > > > Executing example with default input data.
> > > > > > > Use --input to specify file input.
> > > > > > > Printing result to stdout. Use --output to specify output path.
> > > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor
> [] -
> > > > > > > Please note that Flink client operations(e.g. cancel, list,
> stop,
> > > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > > since
> > > > > > > 'kubernetes.rest-service.exposed.type' has been set to
> ClusterIP.
> > > > > > > 2022-07-12 10:23:23,027 INFO
> > > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor
> [] -
> > > > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> > > JobManager
> > > > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081

Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030

2022-07-13 文章 Yang Wang
确认一下你是否正确设置了HADOOP_CONF_DIR环境变量

Best,
Yang

lishiyuan0506  于2022年7月14日周四 09:41写道:

> 打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server:
> 0.0.0.0/0.0.0.0:8030这个异常
>
>
> hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题
>
>
> | |
> lishiyuan0506
> |
> |
> lishiyuan0...@163.com
> |
>
>


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-12 文章 Yang Wang
如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的

最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]

[1]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

yidan zhao  于2022年7月12日周二 13:17写道:

> 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> 那么使用 --target kubernetes-session
> -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> cluster 的 svc 的 clusterIp 去提交呢。
>
> yidan zhao  于2022年7月12日周二 12:50写道:
> >
> > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> >
> >
> > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> >
> > yidan zhao  于2022年7月12日周二 12:48写道:
> > >
> > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > >
> > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > >
> > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > >
> > > > 2022-07-12 10:23:23,021 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > >
> > > > > 如下步骤参考的文档
> > > > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > >
> > > > > 版本:1.15
> > > > >
> > > > > (1)创建集群:
> > > > >
> > > > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > (2)提交任务:
> > > > > ./bin/flink run \
> > > > > --target kubernetes-session \
> > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > >
> > > > > svc是ClusterIp类型
> > > > >
> > > > > 第二步提交任务环节,显示如下:
> > > > > Executing example with default input data.
> > > > > Use --input to specify file input.
> > > > > Printing result to stdout. Use --output to specify output path.
> > > > > 2022-07-12 10:23:23,021 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > 2022-07-12 10:23:23,027 INFO
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> JobManager
> > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > > 2022-07-12 10:23:23,044 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > >
> > > > > 
> > > > >  The program finished with the following exception:
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method caused an error: Failed to execute job
> > > > > 'CarTopSpeedWindowingExample'.
> > > > > ...
> > > > > Caused by: org.apache.flink.util.FlinkException: Failed to execute
> job
> > > > > 'CarTopSpeedWindowingExample'.
> > > > > ...
> > > > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > > > Failed to submit JobGraph.
> > > > > ...
> > > > > Caused by:
> org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > > > Could not complete the operation. Number of retries has been
> > > > > exhausted.
> > > > > ...
> > > > > Caused by: java.util.concurrent.CompletionException:
> > > > > java.net.UnknownHostException: my-first-flink-cluster-rest.test:
> Name
> > > > > or service not known
> > > > > ...
> > > > > Caused by: java.net.UnknownHostException:
> > > > > my-first-flink-cluster-rest.test: Name or service not known
> > > > >
> > > > >
> > > > > 如上,根据 --target kubernetes-session
> > > > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > > >
> my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > > > >
> > > > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > > > >
>


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 Yang Wang
日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
否则你就需要NodePort或者LoadBalancer的方式了

2022-07-12 10:23:23,021 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.


Best,
Yang

yidan zhao  于2022年7月12日周二 10:40写道:

> 如下步骤参考的文档
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
>
> 版本:1.15
>
> (1)创建集群:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> (2)提交任务:
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=my-first-flink-cluster \
> ./examples/streaming/TopSpeedWindowing.jar
>
> svc是ClusterIp类型
>
> 第二步提交任务环节,显示如下:
> Executing example with default input data.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> 2022-07-12 10:23:23,027 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> Web Interface: http://my-first-flink-cluster-rest.test:8081
> 2022-07-12 10:23:23,044 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit JobGraph.
> ...
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been
> exhausted.
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> or service not known
> ...
> Caused by: java.net.UnknownHostException:
> my-first-flink-cluster-rest.test: Name or service not known
>
>
> 如上,根据 --target kubernetes-session
> -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
>
> 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-28 文章 Yang Wang
Thanks Gyula for working on the first patch release for the Flink
Kubernetes Operator project.


Best,
Yang



Gyula Fóra  于2022年6月28日周二 00:22写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.0.1.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
> <
> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html
> >
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Official Docker image for Flink Kubernetes Operator applications can be
> found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 文章 Yang Wang
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
之所以这样设计有两点原因:
(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up


Best,
Yang

Zhanghao Chen  于2022年6月13日周一 07:53写道:

> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 开启 HA 的 Application mode 的 Flink job id
> 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
> 相关联,导致作业从全新状态恢复。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
> 可以看下官方的博客文章: 
> https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
> JIRA 设计文档:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>
>
> Best,
> Zhanghao Chen
> 
> From: m18814122325 
> Sent: Sunday, June 12, 2022 22:45
> To: user-zh@flink.apache.org 
> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>
> Flink version: 1.15.0
>
> deploy mode: Native k8s application
>
>
>
>
> 问题现象:
>
> 我以Native
> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>
>
>
>
> kubectl delete deployment flink-bdra-sql-application-job -n
> bdra-dev-flink-standalone
>
>
>
>
> kubectl get configMap -n bdra-dev-flink-standalone
>
>
>
>
> NAME
>DATA   AGE
>
> flink-bdra-sql-application-job--config-map
> 2  13m
>
> flink-bdra-sql-application-job-cluster-config-map
>   1  13m
>
>
>
>
>
>
>
> 我有以下疑问:
>
> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
>
>
>
> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>
> flink run-application --target kubernetes-application -c CalculateUv
> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
> -Dkubernetes.container.image=
> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
> -Dkubernetes.namespace=bdra-dev-flink-standalone
> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
> -Dstate.backend=filesystem
> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
> -Ds3.access-key=* -Ds3.secret-key=*
> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
> -Dmetrics.reporter.influxdb.scheme=http
> -Dmetrics.reporter.influxdb.host=influxdb
> -Dmetrics.reporter.influxdb.port=8086
> -Dmetrics.reporter.influxdb.db=flink_metrics
> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.config.file=kube_config
> -Dkubernetes.pod-template-file=pod-template.yaml
> local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
>
>
>
>
> 重启后自动从ConfigMap中恢复。
>
> 2022-06-10 20:20:52,592 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Successfully recovered 1 persisted job graphs.
>
> 2022-06-10 20:20:52,654 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
> 0 pods from previous attempts, current attempt id is 1.
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
>
> 2022-06-10 20:20:55,352 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> 

[ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-05 文章 Yang Wang
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.0.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.
This is the first production ready release and brings numerous improvements
and new features to almost every aspect of the operator.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351500

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula & Yang


Re: Flink Application + HA与HistoryServer的使用问题

2022-05-11 文章 Yang Wang
可以临时通过-D "$internal.pipeline.job-id="来自定义job id,但是个内部参数

你可以看下[1],了解更多讨论的信息

[1]. https://issues.apache.org/jira/browse/FLINK-19358

Best,
Yang

谭家良  于2022年5月11日周三 22:17写道:

>
>
> 我使用的Application模式:Kubernetes
> 我使用的HA模式:Kubernetes HA
>
>
> 目前Application + HA发现所有的Job ID都变成了“”,但是在HistoryServer中,他会使用Job
> ID作为归档文件的文件名。那么如果这样,HistoryServer应该怎么与Application + HA相结合呢?
>
>


Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-25 文章 Yang Wang
* 使用flink run命令来提交任务到running的Session集群的话,只能是本地的jar

* 也可以使用rest接口来提交,先上传到JobManager端[1],然后运行上传的jar[2]

* 最后可以尝试一下flink-kubernetes-operator项目,目前Session job是支持远程jar的[3],项目还在不断完善

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-upload
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-jarid-run
[3].
https://github.com/apache/flink-kubernetes-operator/blob/main/e2e-tests/data/sessionjob-cr.yaml

Best,
Yang

天道酬勤 <1262420...@qq.com.invalid> 于2022年4月25日周一 16:51写道:

> 我的flink是通过kubernetes session 模式部署
> ,在提交任务的时候希望可以动态指定第三方jar包来运行自己的任务,目前在官网中未找到可用的配置项。
> 希望大家能给我一些建议!


Re: FlinkSQL 对接k8s的提交问题

2022-04-25 文章 Yang Wang
目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面

就像下面这样

public class JobGraphRunner {

private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

final String restServerAddress = "http://localhost:8081;;
LOG.info("Creating RestClusterClient({})", restServerAddress);

Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
try (ClusterClient clusterClient =
new RestClusterClient<>(
flinkConfig,
flinkConfig.toMap().get("kubernetes.cluster-id"),
(c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
final String jobGraphPath = params.get("jobgraph");
Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");

LOG.info("Loading jobgraph from {}", jobGraphPath);
FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
objectInputStream.close();

final JobID jobID = clusterClient.submitJob(jobGraph).get();
LOG.info("Job {} is submitted successfully", jobID);
}
}
}


Best,
Yang

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?


Re: JobManager doesn't bring up new TaskManager during failure recovery

2022-04-23 文章 Yang Wang
After more debugging, I think this issue is same as FLINK-24315[1],
which is fixed in 1.13.3.

[1]. https://issues.apache.org/jira/browse/FLINK-24315

Best,
Yang

Zheng, Chenyu  于2022年4月22日周五 18:27写道:

> I created a JIRA ticket https://issues.apache.org/jira/browse/FLINK-27350
> to track this issue.
>
>
>
> BRs,
>
> Chenyu
>
>
>
> *From: *"Zheng, Chenyu" 
> *Date: *Friday, April 22, 2022 at 6:26 PM
> *To: *Yang Wang 
> *Cc: *"u...@flink.apache.org" , "
> user-zh@flink.apache.org" 
> *Subject: *Re: JobManager doesn't bring up new TaskManager during failure
> recovery
>
>
>
> Thank you, Yang!
>
>
>
> In fact I have a fine-grained dashboard for Kubernetes cluster health
> (like apiserver qps/latency etc.), and I didn't find anything unusual…
> Also, the JobManager container cpu/memory usage is low.
>
>
>
> Besides, I have a deep dive in these logs and Flink resource manager code,
> and find something interesting. I use taskmanager-1-9 to give you an
> example:
>
>1. I can see logs “Requesting new worker with resource spec
>WorkerResourceSpec” at 2022-04-17 00:33:15,333. And the code location is
>here
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.13.2%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fresourcemanager%2Factive%2FActiveResourceManager.java%23L283=05%7C01%7Cchenyu.zheng%40disneystreaming.com%7Cbaa22ad99dd0423f8fa808da244a8dc2%7C65f03ca86d0a493e9e4ac85ac9526a03%7C0%7C0%7C63786219988139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=OlH3iQ6OR4rjrRodaG38AihihsR9d7Fy1pqosGaBpqg%3D=0>
>.
>2. “Creating new TaskManager pod with name
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 and resource
><16384,4.0>” at 2022-04-17 00:33:15,376, code location
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.13.2%2Fflink-kubernetes%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fkubernetes%2FKubernetesResourceManagerDriver.java%23L167=05%7C01%7Cchenyu.zheng%40disneystreaming.com%7Cbaa22ad99dd0423f8fa808da244a8dc2%7C65f03ca86d0a493e9e4ac85ac9526a03%7C0%7C0%7C63786219988139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=hIavko7ONdrzzC3icwg2rPfIJM7oRDBlToKpd1A3b30%3D=0>
>.
>3. “Pod stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 is
>created.” at 2022-04-17 00:33:15,412, code location
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.13.2%2Fflink-kubernetes%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fkubernetes%2FKubernetesResourceManagerDriver.java%23L190=05%7C01%7Cchenyu.zheng%40disneystreaming.com%7Cbaa22ad99dd0423f8fa808da244a8dc2%7C65f03ca86d0a493e9e4ac85ac9526a03%7C0%7C0%7C63786219988139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=uzWc65ZqnAcguJBlodWtiz6yoahV0TdAYPq95JMRV0A%3D=0>.
>*The request is sent and pod is created here, so I think the apiserver
>is healthy at that moment.*
>4. But I cannot find any logs that print in line
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.13.2%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fresourcemanager%2Factive%2FActiveResourceManager.java%23L301=05%7C01%7Cchenyu.zheng%40disneystreaming.com%7Cbaa22ad99dd0423f8fa808da244a8dc2%7C65f03ca86d0a493e9e4ac85ac9526a03%7C0%7C0%7C63786219988139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=uhvFoCWiQtlnRHu86bczN8J%2Btpq9H1QggZFZl%2FC%2BlAQ%3D=0>
>and line
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.13.2%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fresourcemanager%2Factive%2FActiveResourceManager.java%23L314=05%7C01%7Cchenyu.zheng%40disneystreaming.com%7Cbaa22ad99dd0423f8fa808da244a8dc2%7C65f03ca86d0a493e9e4ac85ac9526a03%7C0%7C0%7C63786219988139%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=DY3JuIuu947uM9yCTq%2FKfY3jmVIJ8gS8SkzRP7O%2BLVA%3D=0>
>.
>5. “Discard registration from TaskExecutor
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9” at 2022-04-17
>00:33:32,393. Root cause of this logs is due to the workerNodeMap
>
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F

Re: JobManager doesn't bring up new TaskManager during failure recovery

2022-04-22 文章 Yang Wang
The root cause might be you APIServer is overloaded or not running
normally. And then all the pods events of
taskmanager-1-9 and taskmanager-1-10 are not delivered to the watch in
FlinkResourceManager.
So the two taskmanagers are not recognized by ResourceManager and then
registration are rejected.

The ResourceManager also did not receive the terminated pod events. That's
why it does not allocate new TaskManager pods.

All in all, I believe you need to check the K8s APIServer status.

Best,
Yang

Zheng, Chenyu  于2022年4月22日周五 12:54写道:

> Hi developers!
>
>
>
> I got a strange bug during failure recovery of Flink. It seems the
> JobManager doesn't bring up new TaskManager during failure recovery. Some
> logs and information of the Flink job are pasted below. Can you take a look
> and give me some guidance? Thank you so much!
>
>
>
> Flink version: 1.13.2
>
> Deploy mode: K8s native
>
> Timeline of the bug:
>
>1. Flink job start to work with 8 taskmanagers.
>2. At *2022-04-17 00:28:15,286*, this job got an error and JobManager
>decided to restart 2 tasks (pod
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-1,
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-7)
>3. The two old pod is stopped and JobManager created 2 pod (pod
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9,
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-10) at *2022-04-17
>00:33:15,376*
>4. JobManager discard two new pods’ registration at *2022-04-17
>00:33:32,393*
>5. These new pods exited at *2022-04-17 00:33:32,396*, due to the
>rejection of registration.
>6. JobManager didn’t bring up new pods and print error “Slot request
>bulk is not fulfillable! Could not allocate the required slot within slot
>request timeout” over and over
>
> Flink logs:
>
> 1.  JobManager:
> https://drive.google.com/file/d/1HuRQUFQrq9JIfrOzH9qBPCK1hMsyqFpJ/view?usp=sharing
>
> 2.  TaskManager:
> https://drive.google.com/file/d/1ReWR27VlXCkGCFN62__j0UpQlXV7Ensn/view?usp=sharing
>
>
>
>
>
> BRs,
>
> Chenyu
>


Re: Flink Kubernetes Operator

2022-04-14 文章 Yang Wang
你这个报错主要原因还是访问外部的一些镜像源失败导致的,你可以使用一些云厂商提供的代理来解决拉镜像失败的问题

或者使用--set webhook.create=false来关闭webhook功能

Best,
Yang

casel.chen  于2022年4月14日周四 15:46写道:

> The deployment 'cert-manager-webhook' shows
> Failed to pull image "quay.io/jetstack/cert-manager-webhook:v1.7.1": rpc
> error: code = Unknown desc = Error response from daemon: Get "
> https://quay.io/v2/": net/http: TLS handshake timeout
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-04-14 15:40:51,"casel.chen"  写道:
> >按照其官方文档[1]尝试在mac本地的minikube上运行Flink Kubernetes Operator,结果抛下面的连接错误:
> >
> >
> >
> >$ helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator --set
> image.repository=apache/flink-kubernetes-operator
> >
> >WARNING: Kubernetes configuration file is group-readable. This is
> insecure. Location: /Users/admin/.kube/config
> >
> >WARNING: Kubernetes configuration file is world-readable. This is
> insecure. Location: /Users/admin/.kube/config
> >
> >Error: INSTALLATION FAILED: failed to create resource: Internal error
> occurred: failed calling webhook "webhook.cert-manager.io": Post "
> https://cert-manager-webhook.cert-manager.svc:443/mutate?timeout=10s":
> dial tcp 10.96.143.23:443: connect: connection refused
> >
> >
> >
> >
> >实验步骤:
> >
> >a) minikube start --kubernetes-version=v1.21.5 --memory 8192 --cpus 4
> >
> >b) kubectl create -f
> https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
> >
> >c) helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/
> >
> >d) helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator--set
> image.repository=apache/flink-kubernetes-operator
> >
> >
> >
> >
> >[1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/try-flink-kubernetes-operator/quick-start/
>


Re: Re: k8s native session 问题咨询

2022-03-08 文章 Yang Wang
你用新版本试一下,看着是已经修复了

https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

崔深圳  于2022年3月9日周三 10:31写道:

>
>
>
> web ui报错:请求这个接口: /jobs/overview,时而报错, Exception on server
> side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
> Failed to serialize the result for RPC call :
> requestMultipleJobDetails.\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)\n\tat
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)\n\tat
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)\n\tat
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)\n\tat
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)\nCaused
> by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)\n\tat
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)\n\tat
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)\n\tat
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)\n\tat
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)\n\tat
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)\n\tat
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
> 29 more\n\nEnd of exception on server side"
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-03-09 09:56:21,"yu'an huang"  写道:
> >你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager,
> 然后向Active Job manager拿到结果再返回给client.
> >
> >> On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
> >>
> >> k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest
> service访问,总是路由到非master节点,有什么办法使其稳定吗?
> >
>


Re: K8s部署Flink 作业,无法在Web UI查看TaskManger的STDOUT日志

2022-03-02 文章 Yang Wang
Standalone Flink on K8s 和 native K8s都会有你说的这个问题

主要原因是标准输出打印到的pod console了,所以通过kubectl logs可以查看stdout日志,但webUI上就没有

你可以参考这个commit[1]自己编译一个Flink binary来实现

[1].
https://github.com/wangyang0918/flink/commit/2454b6daa2978f9ea9669435f92a9f2e78de357a

Best,
Yang


xinzhuxiansheng  于2022年3月2日周三 15:00写道:

>  Flink on Native k8s部署后,程序答应的log,无法在Flink UI TaskManager的查看到日志。 不太想ELK方式收集
> TaskManager 日志。 若使用kubectl logs查看pod的标准化输出对用户来说太复杂了。请问大家是如何处理的?
>
>
> | |
> xinzhuxiansheng
> |
> |
> xinzhuxiansh...@126.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2022-01-11 文章 Yang Wang
你可以通过环境变量或者flink config option的方式来指定kube config

export KUBECONFIG=/path/of/kube.config

或者

-Dkubernetes.config.file=/path/of/kube.config

具体的代码在这里[1]

[1].
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java#L58


Best,
Yang

JianWen Huang  于2022年1月10日周一 22:04写道:

> 首先感谢您答复。我也想到了采用第二种JOB动态+ConfigMap挂到Flink Client Pod中,然后命令提交。
> 另外您和官方文档都提到kube config的配置。请问flink client在源码实现中是在哪个地方去解析读取kube config的?
>
> Yang Wang  于2022年1月10日周一 15:17写道:
> >
> > 抱歉回复晚了
> >
> > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> >
> > Flink client并不依赖K8s客户端的,只要有对应的kube config就可以了
> >
> >
> > 你说的两种方法都是可以的,而且也没有本质上的差异。都是把Flink client运行在集群内来完成提交,第一种是常驻的,第二种是动态起的 。
> > 如果作业使用的pod template都是一样的,那就可以自己保存在ConfigMap中然后挂载给Flink client pod就可以了。
> > 如果每个作业使用的都不同,就只能按照你说的方法了
> >
> >
> > 另外,还有一个可行的思路是开发一个你们自己的K8s operator,然后通过CR的方式进行传递。可以参考这个简单的demo[1]
> >
> > [1]. https://github.com/wangyang0918/flink-native-k8s-operator
> >
> >
> > Best,
> > Yang
> >
> >
> >
> > JianWen Huang  于2021年12月30日周四 00:01写道:
> >
> > > 明白了。感谢。
> > > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> > > 请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。
> > > 1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。
> > > 2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。
> > >
> > >
> 第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。
> > > 第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。
> > > 请问您有更好的方法吗。
> > >
> > > Yang Wang  于2021年12月26日周日 16:39写道:
> > > >
> > > > 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
> > > > client会自动把这个文件存放到ConfigMap,然后挂载给JM的
> > > > user jar(StateMachineExample.jar)是需要在镜像里面
> > > >
> > > > 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要
> > > >
> > > > bin/flink run-application -t kubernetes-application \
> > > > -Dkubernetes.cluster-id=my-flink-cluster \
> > > > -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
> > > > local:///opt/flink/examples/streaming/StateMachineExample.jar
> > > >
> > > >
> > > >
> > > > 如果还是不明白,看一下这个测试的实现就清楚了[1]
> > > >
> > > > [1].
> > > >
> > >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > 黄剑文  于2021年12月24日周五 17:57写道:
> > > >
> > > > > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
> > > > >
> > > > >
> > >
> 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
> > > > >
> > > > > 谢谢您的回复。
> > > > >
> > > > > Yang Wang  于2021年12月24日周五 11:18写道:
> > > > > >
> > > > > > 使用flink
> > > > > >
> > >
> run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > > > > > 不是镜像里面的
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> > > > > >
> > > > > > > Flink版本:1.13Flink基于Native K8s
> > > > > > >
> > > > >
> > >
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > > > > > >
> > > > > > >
> > > > >
> > >
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
> > > > >
> > >
>


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2022-01-09 文章 Yang Wang
抱歉回复晚了

在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。

Flink client并不依赖K8s客户端的,只要有对应的kube config就可以了


你说的两种方法都是可以的,而且也没有本质上的差异。都是把Flink client运行在集群内来完成提交,第一种是常驻的,第二种是动态起的 。
如果作业使用的pod template都是一样的,那就可以自己保存在ConfigMap中然后挂载给Flink client pod就可以了。
如果每个作业使用的都不同,就只能按照你说的方法了


另外,还有一个可行的思路是开发一个你们自己的K8s operator,然后通过CR的方式进行传递。可以参考这个简单的demo[1]

[1]. https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang



JianWen Huang  于2021年12月30日周四 00:01写道:

> 明白了。感谢。
> 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> 请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。
> 1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。
> 2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。
>
> 第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。
> 第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。
> 请问您有更好的方法吗。
>
> Yang Wang  于2021年12月26日周日 16:39写道:
> >
> > 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
> > client会自动把这个文件存放到ConfigMap,然后挂载给JM的
> > user jar(StateMachineExample.jar)是需要在镜像里面
> >
> > 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要
> >
> > bin/flink run-application -t kubernetes-application \
> > -Dkubernetes.cluster-id=my-flink-cluster \
> > -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
> > local:///opt/flink/examples/streaming/StateMachineExample.jar
> >
> >
> >
> > 如果还是不明白,看一下这个测试的实现就清楚了[1]
> >
> > [1].
> >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh
> >
> >
> > Best,
> > Yang
> >
> > 黄剑文  于2021年12月24日周五 17:57写道:
> >
> > > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
> > >
> > >
> 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
> > >
> > > 谢谢您的回复。
> > >
> > > Yang Wang  于2021年12月24日周五 11:18写道:
> > > >
> > > > 使用flink
> > > >
> run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > > > 不是镜像里面的
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> > > >
> > > > > Flink版本:1.13Flink基于Native K8s
> > > > >
> > >
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > > > >
> > > > >
> > >
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
> > >
>


Re: K8s模式指定-C参数不生效

2022-01-09 文章 Yang Wang
file:///home/service/var/chubaofs/flink/user/usrlib/
demo/httpclient-4.5.2.jar

你指定的这个文件是在镜像里面吗?如果不是需要打到镜像里面,或者使用pod template注入init-container来下载到容器里面


Best,
Yang

johnjlong  于2022年1月5日周三 18:28写道:

>
> 大佬们
> 我部署作业到k8s集群,有多个作业共享一个外部的jar的情况。外部的jar的通过共享存储,每个JM、TM都能访问。
> 我使用-C指定依赖的classpath。但是依然找不到对应的class。问题可能出在哪里呢??
> 提交命令如下:
>
> bin/flink run-application -t kubernetes-application
> -Dkubernetes.cluster-id=demo
> -Dkubernetes.container.image=devhub.baymax.oppo.local/flink-browser-cluster/flink:1.13.2-scala_2.11-java8-centos-v1.0.0
> -Dkubernetes.container.image.pull-policy=Always
> -Dkubernetes.rest-service.exposed.type=NodePort -C
> file:///home/service/var/chubaofs/flink/user/usrlib/demo/httpclient-4.5.2.jar
> -c com.martin.RandomDemo -p 2
> local:///opt/flink/user/jars/flink-demo-1.0-without-http-SNAPSHOT.jar
>
>
> 部署截图:
>
> 错误截图:
>
> johnjlong
> johnjl...@163.com
>
> 
> 签名由网易邮箱大师 定制
>


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2021-12-26 文章 Yang Wang
拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
client会自动把这个文件存放到ConfigMap,然后挂载给JM的
user jar(StateMachineExample.jar)是需要在镜像里面

注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要

bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=my-flink-cluster \
-Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
local:///opt/flink/examples/streaming/StateMachineExample.jar



如果还是不明白,看一下这个测试的实现就清楚了[1]

[1].
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh


Best,
Yang

黄剑文  于2021年12月24日周五 17:57写道:

> client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
>
> 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
>
> 谢谢您的回复。
>
> Yang Wang  于2021年12月24日周五 11:18写道:
> >
> > 使用flink
> > run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > 不是镜像里面的
> >
> > Best,
> > Yang
> >
> > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> >
> > > Flink版本:1.13Flink基于Native K8s
> > >
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > >
> > >
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
>


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2021-12-23 文章 Yang Wang
使用flink
run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
不是镜像里面的

Best,
Yang

hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:

> Flink版本:1.13Flink基于Native K8s
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
>
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。


Re: Re:flink on native k8s模式下CPU使用率不高问题

2021-12-23 文章 Yang Wang
CPU不会触发驱逐的,只有内存的request/limit不一样可能会发生这样的事情

Best,
Yang

casel.chen  于2021年12月23日周四 17:18写道:

> cpu request和limit不同会有什么影响吗?会不会pod竞争不过被kill掉?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-20 11:36:02,"Jeff"  写道:
> >升级版本没有用的,我用的是flink 1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:
> https://github.com/jeff-zou/flink.git
> ,我主要是改了KubernetesUtils.java这个类,利用external resource传入参数来替换request
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-12-18 09:15:06,"casel.chen"  写道:
> >>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是
> kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu
> request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。
> >>如何才能够提高作业CPU使用率呢?可以直接修改k8s
> yaml文件将request设置得高一些,而limit设置低一些吗?还有更好的办法吗?升级Flink版本有望解决么?
>


Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 文章 Yang Wang
我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式

-yt /path/of/my-hadoop-conf
-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'


Best,
Yang

chenqizhu  于2021年11月30日周二 上午10:00写道:

> all,您好:
>
>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>
>
>
>
> flink.hadoop.dfs.nameservices: ACluster,BCluster
>
> flink.hadoop.fs.defaultFS: hdfs://BCluster
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>
> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
> hdfs://ACluster)
>
>
>
>
> Caused by: BCluster
>
> java.net.UnknownHostException: BCluster
>
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>
> at
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>
>
>
>
> 我的组件版本:
>
> flink : 1.13.3
>
> hadoop : 3.3.0
>
>
>
>
> 期待回复,感谢!


Re: Flink1.13.3 on Native K8s 任务取消问题

2021-11-30 文章 Yang Wang
如果你使用的是native模式,设计上cancel job以后,所有K8s相关的资源会被释放,HA相关的信息也会自动被删除[1]

出现重新拉起是不符合预期的,你可以把JM日志发出来具体看一下

[1].
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up

Best,
Yang

研发-江志洋  于2021年11月30日周二 上午9:39写道:

> 你好,最近在使用Flink1.13.3 Application模式 on Native K8s部署任务,有几个问题不太理解:
> 1.在UI页面点击cancel取消任务,马上又会重新拉起来,达不到取消任务的目的?
> 2.通过kubectl delete deployment/cluster_id 命令删除任务后,通过bin/flink
> run-application方式起任务,会自动的从上次最后一个成功Checkpoint进行恢复,有什么方式可以不从ck恢复吗?
>
>
> 期复!


Re: io.tmp.dirs的配置如何做到高可用?

2021-11-17 文章 Yang Wang
在Yarn上部署时,会把所有的LOCAL_DIRS都设置为io.tmp.dirs。这样在某些情况下可以达到更好的性能

因为这个本来就是用来存放临时文件的,不存在高可用的问题。
如果有一块盘坏掉,目前YARN只能保证新起的container不再使用这块盘,现有正在使用这个盘的container不会处理
所以,只能等container自己失败退出了

Best,
Yang

鲁成祥 <979775...@qq.com.invalid> 于2021年11月15日周一 下午8:30写道:

> 目前Flink Yarn集群上 io.tmp.dirs 统一配置了 /mnt/ssd/0/xxx,/mnt/ssd/1/xxx 两个磁盘路径,
> 最近遇到其中一块盘挂掉的情况, 一些job出现异常,一些并没有,
> 官方文档对这个配置的解释为:
>
> io.tmp.dirs: The directories where Flink puts local data, defaults to the
> system temp directory (java.io.tmpdirproperty). If a list of
> directories is configured, Flink will rotate files across the directories.
>
>
>
> 有人可以帮解释下出现坏盘的影响面,以及这块高可用的实现情况吗?


Re: flink on native k8s调度控制

2021-11-16 文章 Yang Wang
你这个需求是taint和toleration[1]可以解决的,目前native K8s是支持的[2]

kubernetes.jobmanager.tolerations
kubernetes.taskmanager.tolerations

[1].
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
[2].
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-jobmanager-tolerations

Best,
Yang

casel.chen  于2021年11月15日周一 下午6:25写道:

> flink作业跑在一个k8s集群,该集群下面有若干个节点,想将某些节点给部门A使用,其他节点给部门B使用。请问flink有配置控制k8s调度吗?
> 1. 给节点打上标签
> 2. 作业提交的时候设置调度标签值


Re: flink on native k8s application模式作业启动慢问题

2021-11-14 文章 Yang Wang
启动速度不应该这么慢的,你需要确认一下是否是下载镜像太慢导致的

如果是,可以把镜像提前预热到所有K8s节点,这样可以大幅减少启动时间

Best,
Yang

casel.chen  于2021年11月13日周六 上午8:46写道:

> 同一个作业,在请求资源相同的情况下,使用flink on native k8s session mode和application
> mode启动时间相差十几倍。
> 在session mode下提交作业,最多需要2分钟。而在application mode下提交作业,少则10分钟,多则半个小时至一个小时。
> 1. 想问一下作业启动速度跟什么有关?
> 2. 要怎么提高作业启动速度呢?


Re: flink on native k8s资源弹性伸缩问题

2021-11-14 文章 Yang Wang
我理解你的需求其实可以通过让cpu的limit与request大来解决,已经有相关的ticket但还没有实现

https://issues.apache.org/jira/browse/FLINK-15648


Best,
Yang

casel.chen  于2021年11月13日周六 上午8:52写道:

> 我发现作业启动时消耗的资源量是最大的,但当作业运行起来后资源消耗量会下降。
> 这点从cpu消耗看更为明显,申请了2 vCore (Request == Limit),作业启动会用到 1.2
> vCore,但当作业运行起来后实际只用到了0.8 vCore不到,造成了CPU资源浪费。
> 请问有没有办法做到资源弹性伸缩,即以我申请vCore数为上限,留有20%
> buffer应付流量高峰,除此之外在作业运行过程中多余的资源能释放给其他作业使用?
> 如果能做到的话要如何配置?是不是要升级到flink 1.14? 谢谢!


Re: 请求帮助

2021-11-14 文章 Yang Wang
你可以查看一下JM的日志,看看提交的job或者failover之后恢复的job
id是不是397a081a0313f462818575fc725b3582


Best,
Yang

RS  于2021年11月15日周一 上午9:53写道:

> 查看下client的日志,一般在flink的logs目录下
>
>
>
>
> 在 2021-11-12 20:59:59,"sky"  写道:
> >我使用的事flink on yarn。在执行命令时:  flink run -m yarn-cluster
> ./examples/batch/WordCount.jar  结果却报错了:
> >
> >The program finished with the following exception:
> >
> >org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job (397a081a0313f462818575fc725b3582)
> > at
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94)
> > at
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84)
> > at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >
> > ...
> >麻烦告知是什么原因呢,我配置文件是这样的:
>
> >#===
> >high-availability: zookeeper
> >high-availability.storageDir: hdfs://mycluster/flink/ha/
> >high-availability.zookeeper.quorum:
> hadoop201:2181,hadoop202:2181,hadoop203:2181
> >high-availability.zookeeper.path.root: /flink
> >high-availability.cluster-id: /default_one # important: customize per
> cluster
> >#设置ck的状态后端
> >state.backend: filesystem
> >state.checkpoints.dir: hdfs://mycluster/flink/checkpoints
> >#设置默认的savepoint的保存位置
> >state.savepoints.dir: hdfs://mycluster/flink/savepoints
> ># 集群名称不能写错
> >jobmanager.archive.fs.dir: hdfs://mycluster/flink/completed-jobs/
> >historyserver.archive.fs.dir: hdfs://mycluster/flink/completed-jobs/
>
> >#===
> >
> >谢谢!
>


Re: flink yarn-per-job模式 ynm命令不起作用

2021-10-25 文章 Yang Wang
-t参数需要搭配-D一起来使用,而不是使用-y来引导

例如:-t yarn-per-job -Dyarn.application.name=flink-test

Best,
Yang

Lawulu  于2021年10月25日周一 上午11:41写道:

> 例如:
> bin/flink run -ynm flink-test -t yarn-per-job --detached
> ./examples/streaming/TopSpeedWindowing.jar
>
>
> 在yarn ui上面看name还是Flink per-job cluster


Re: Flink on native k8s如何自定义挂载盘?

2021-09-18 文章 Yang Wang
东东是正确的

这种情况只能使用pod template来挂载PV或者使用hostpath来保存heap dump的文件

Best,
Yang

东东  于2021年9月17日周五 下午5:10写道:

> 升级到1.13用pod template吧,这之前的版本没有官方支持的方式
>
>
> 在 2021-09-17 16:43:53,"casel.chen"  写道:
> >为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数
> >-Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/var/log/oom.bin"
> >想在OOM发生的时候能生成HeapDumpFile,以便事后分析。
> >但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。
> >请问Flink on native k8s要如何自定义挂载盘呢?使用的Flink版本是1.12.5
>


Re: flink run -d -m yarn-cluster 提交任务到yarn集群不成功

2021-08-30 文章 Yang Wang
export HADOOP_CLASSPATH=`hadoop classpath`

如上方式应该是没有问题的,你确认下这些目录下面的jar包是存在的,尤其是/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/

Best,
Yang

龙逸尘  于2021年8月31日周二 上午11:02写道:

> Hi Wayne,
>
> 可以尝试下指定 HADOOP_CONF_DIR
> export HADOOP_CONF_DIR=/opt/flink/hadoop-conf/
>
> Wayne <1...@163.com> 于2021年8月28日周六 下午8:37写道:
>
> > 我的提交命令
> >
> >
> > ./bin/flink run -d -m yarn-cluster
> >
> >
> > 报错如下
> >  The program finished with the following exception:
> >
> >
> > java.lang.IllegalStateException: No Executor found. Please make sure to
> > export the HADOOP_CLASSPATH environment variable or have hadoop in your
> > classpath. For more information refer to the "Deployment" section of the
> > official Apache Flink documentation.
> > at
> >
> org.apache.flink.yarn.cli.FallbackYarnSessionCli.isActive(FallbackYarnSessionCli.java:41)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine(CliFrontend.java:1236)
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:234)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> > at
> >
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> >
> >
> > 运行命令 hadoop classpath
> > @192 flink-1.12.2 % hadoop classpath
> >
> >
> /Users//local/hadoop/hadoop-3.2.2/etc/hadoop:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/*:/Users//local/hadoop/hadoop-3.2.2
> > 反复配置HADOOP_CLASSPATH 无法生效 官网给出的
> > export HADOOP_CLASSPATH=`hadoop classpath`
> > 这个 hadoop classpath 具体配置到哪一级
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: flink on k8s作业日志持久化问题

2021-08-22 文章 Yang Wang
日志采集一般两种方式:
1. K8s节点侧统一收集,例如阿里云的ilogtail[1],Flink一般只需要输出到标准输出或者emptyDir挂载就可以了
2. 利用log4j2 custom appender,直接将日志推送到存储服务(OSS、阿里云SLS等),需要自己写一个插件或者使用阿里云现有提供的

[1]. https://help.aliyun.com/document_detail/87540.html

Best,
Yang

东东  于2021年8月23日周一 下午12:14写道:

>
>
>
> 把容器的日志采集下来不就行了么,K8s下ELK采集容器日志的方案很成熟啊,基本上官方Helm Charts默认安装就能全采集下来。
>
>
>
> 在 2021-08-23 11:37:54,"casel.chen"  写道:
> >flink 1.12.1版本,作业通过flink run命令提交,运行在native
> k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root
> cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云
> EKS,是否可以配置作业使用阿里云日志服务呢?我知道k8s应用可以配置ecs宿主机路径映射存储,但这需要修改创建作业的yaml文件,不过我没有看到flink
> on k8s启动命令有这个选项,求解答,谢谢!
>


Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 文章 Yang Wang
看报错应该是个已知问题[1]并且已经在1.11.2中修复

[1]. https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞  于2021年8月17日周二 上午11:04写道:

> 您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA
> 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.
>
>
> HA 配置如下:
> high-availability: zookeeper high-availability.storageDir:
> hdfs://mycluster/flink/ha high-availability.zookeeper.quorum:
> zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: /flink_cluster
>
>
> 异常如下:
> 2021-08-17 10:24:18,938 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
> 2021-08-17 10:25:09,706 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerDetailsInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_292]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 

Re: 请问下flink on kubernetes 开发计划

2021-07-25 文章 Yang Wang
目前Flink on Kubernetes的大feature已经开发完成,包括native Kubernetes[1](session mode
and application mode)、
Kubernetes HA[2]、pod template[3]

1.12版本也引入很多稳定性相关的提升,包括Job任务结束后清理HA信息、ClusterEntrypoint优化避免ConfigMap残留等,
已经达到生产可用的标准。

但是,我相信还是有很多需要打磨的细节,例如:当使用LoadBalancer时返回的结果可能是一个不正确的值,需要梳理
Fabric8FlinkKubeClient#getRestEndpoint的逻辑等,可以通过JIRA的过滤[4]拿到K8s模块相关ticket,挑选感兴趣的开始做

当时你在测试、使用的过程中有任何疑问欢迎在社区的ML里讨论,发现bug开JIRA来跟踪

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
[3].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
[4].
https://issues.apache.org/jira/browse/FLINK-23478?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20%22Deployment%20%2F%20Kubernetes%22

laohu <2372554...@qq.com.invalid> 于2021年7月23日周五 下午5:03写道:

> 大家好!
>
>  请问下flink on kubernetes 开发计划。是否可以参与。
>
>
>


Re: Re:Re: flink on native k8s要如何动态改变日志配置?

2021-07-13 文章 Yang Wang
运行在session内的任务日志无法独立配置
除非是你每个任务的用户代码package都不一样,在log4j中配置不同的package写入不同的文件。但Flink框架日志还是无法区分

Best,
Yang

东东  于2021年7月13日周二 下午12:56写道:

> 是的,日志配置是针对JM和TM的
>
>
>
>
>
> 在 2021-07-13 12:37:20,"casel.chen"  写道:
> >如果是 session
> mode的话,日志配置文件是对整个session上运行的作业都生效么?每个运行在session上的作业可以独立配置日志吗?谢谢!
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-07-12 10:57:08,"Yang Wang"  写道:
> >>你直接修改ConfigMap中存储的log4j-console.properties就可以立即生效了,具体参考这里[1]
> >>
> >>[1].
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#changing-the-log-level-dynamically
> >>
> >>Best,
> >>Yang
> >>
> >>casel.chen  于2021年7月9日周五 下午8:29写道:
> >>
> >>> flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger
> >>> Level,以及用户可以传入自定义的日志模板,目前有办法做到么?
>


Re: flink on native k8s要如何动态改变日志配置?

2021-07-11 文章 Yang Wang
你直接修改ConfigMap中存储的log4j-console.properties就可以立即生效了,具体参考这里[1]

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#changing-the-log-level-dynamically

Best,
Yang

casel.chen  于2021年7月9日周五 下午8:29写道:

> flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger
> Level,以及用户可以传入自定义的日志模板,目前有办法做到么?


Re: question about flink on k8s per-job mode

2021-06-21 文章 Yang Wang
如果不严格区分名字的话,application模式和per-job模式在K8s环境下是没有差异的,都是在JM完成的任务提交
目前的K8s application mode在高可用开启的情况下,也只能支持提交一个任务

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode

Best,
Yang


Best,
Yang

at003  于2021年6月17日周四 下午4:51写道:

> 哈喽,各位专家/大神:
>
> 为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。
>
> 谢谢
>
>
>
>


Re: standalone K8S 如何查看 TaskMananger 的 gc.log ?

2021-06-21 文章 Yang Wang
你可以增加如下参数来打开gc日志,并且写入到/opt/flink/log目录下

env.java.opts: -verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:ParallelGCThreads=4 -Xss512k
env.java.opts.jobmanager: -Xloggc:/opt/flink/log/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M
env.java.opts.taskmanager: -Xloggc:/opt/flink/log/taskmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

WeiXubin <18925434...@163.com> 于2021年6月18日周五 下午2:27写道:

> 请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置
> TaskMananger 的 *gc.log* 日志?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JobManager使用zk做高可用的时候,如何得到当前活跃的JobManager地址

2021-06-15 文章 Yang Wang
你可以参考一下RestClusterClient里面实现的使用ClientHighAvailabilityServices来获取leader地址的实现


Best,
Yang

yidan zhao  于2021年6月15日周二 下午2:52写道:

> 代码里不需要remoteEnv创建,你这种removeEnv是通过本地ide提交任务的情况。这种情况很少见,一般线上网络环境和办公网络是隔离的吧。
>
> yidan zhao  于2021年6月15日周二 下午2:49写道:
> >
> > 你这个方式不是用于测试嘛。线上情况不应该使用flink run命令嘛。使用flink run的情况下本身就是基于zk自动获取jm地址提交的。
> >
> > cuicle  于2021年6月15日周二 下午2:04写道:
> > >
> > > 别估计啊。。。理论上只有一个active
> > > jobManager,其它的都是standby。就算你说的可以,那客户端再维护所有的JobManager的地址也是很丑的呀。
> > >
> > >
> > > 我是想是否能从zk里面直接获取当前的active jobManager
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink on 原生kubernetes支持批处理吗?

2021-06-15 文章 Yang Wang
Native Flink on K8s是可以运行batch任务的,目前还是用的K8s Deployment来管理JobManager。当任务结束以后,
会自动deregister整个Flink Application,此时会清理掉所有的K8s资源。

如果你想在结束后查看任务相关的一些信息,可以部署一个history server,并配置给batch任务


Best,
Yang

casel.chen  于2021年6月13日周日 上午12:18写道:

> 我们知道flink on 原生kubernetes当前是用k8s deployment运行一个流作业的,请问会用k8s job运行一个批作业吗?


Re: Flink application on native k8s如何修改配置文件?

2021-06-07 文章 Yang Wang
Native
k8s部署模式下,会自动将$FLINK_CONF_DIR目录下的flink-conf.yaml以及log4j-console.properties放到ConfigMap里面
然后挂载给JM与TM。你只需要修改对应的本地文件就好了

Best,
Yang

eriendeng  于2021年6月7日周一 下午3:09写道:

> Hi all,
> 最近再把flink任务迁移到native
> k8s,发现flink-conf系列的文件没有办法很好地被修改,比如log4j文件还有一些很通用的写在flink-conf的配置项(e.g.
> Prometheus配置)。
> flink-conf的配置我还可以在flink run的时候带上,那log4j的配置好像没有太好的办法带上。
> 大家在这块有什么实践吗?thx.
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-31 文章 Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚  于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚"  写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> > >> sourceMilApplysLogStream = MySQLSource.builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>


Re: flink 1.13 k8s native 启动找不到 KubernetesSessionClusterEntrypoint

2021-05-31 文章 Yang Wang
你可以describe一下失败JM的pod发出来,看看生成的启动命令是不是正确的


Best,
Yang

fz  于2021年5月28日周五 下午10:09写道:

> 镜像: flink:1.13.0-scala_2.11
>
> sed: cannot rename /opt/flink/conf/sed1yRdDY: Device or resource busy
> sed: cannot rename /opt/flink/conf/sed03zP3W: Device or resource busy
> /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only
> file system
> sed: cannot rename /opt/flink/conf/sedFtORA0: Device or resource busy
> mv: cannot move '/opt/flink/conf/flink-conf.yaml.tmp' to
> '/opt/flink/conf/flink-conf.yaml': Device or resource busy
> + /usr/local/openjdk-8/bin/java -classpath '/opt/flink/lib/*' -Xms30720m
> -Xmx30720m -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> Error: Could not find or load main class
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink on k8s native 测试失败

2021-05-12 文章 Yang Wang
你参数配置错了,应该是kubernetes.rest-service.exposed.type=NodePort

Best,
Yang

仙剑……情动人间 <1510603...@qq.com> 于2021年5月12日周三 上午9:45写道:

> Hi All,
>
> 我这里在自己搭建的 k8s 测试环境中按照官网的 flink on k8s native
> 指南进行测试,然后一直报错,具体情况如下,请求大佬们的帮助,不胜感激。
>
> 一、环境
>
> k8s  v1.18.16
> flink  flink-1.12.2
> java  openjdk version "11" 2018-09-25
>
> 二、官网规定的前提条件
>
>1、Kubernetes >= 1.9  满足
>
>
>
> 2、权限检查  满足
>
>
>  3、启用 Kubernetes DNS  满足
>
>
> 4、default service account with RBAC
> 
>  permissions
> to create, delete pods ,我这里创建新的 命名空间和账号,也是满足的,情况如下:
>
> kubectl create namespace flink-session-cluster
>
> kubectl create serviceaccount flink -n flink-session-cluster
>
> kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole
> =edit \ --serviceaccount=flink-session-cluster:flink
>
>
> 三、启动 session 并提交 flink 任务
>
>  3.1 启动session 命令
>
> ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
> \
> > -Dkubernetes.namespace=flink-session-cluster \
> > -Dkubernetes.jobmanager.service-account=flink \
> > -Dkubernetes.service.exposed.type=NodePort
>
>   日志:
>
>
>   3.2 提交flink 任务
>
>命令:
>   ./bin/flink run \
> > --target kubernetes-session \
> > -Dkubernetes.namespace=flink-session-cluster \
> > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > -Dkubernetes.jobmanager.service-account=flink \
> > ./examples/streaming/TopSpeedWindowing.jar
>
>报错日志如下:
>
> Executing TopSpeedWindowing example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/home/dsi/soft/flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar) to
> field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> 2021-05-12 15:51:30,453 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve
> flink cluster my-first-flink-cluster successfully, JobManager Web
> Interface: http://10.199.252.101:8081
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'CarTopSpeedWindowingExample'.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at
> org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.main(TopSpeedWindowing.java:99)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
> at
> 

Re: Fw:flink on native kubernetes要如何修改Logging配置?

2021-05-10 文章 Yang Wang
你只需要修改本地Flink conf目录下面的log4j-console.properties就可以了

这个配置文件会通过ConfigMap自动ship到JM和TM上并使用

Best,
Yang

casel.chen  于2021年5月8日周六 下午11:57写道:

>
> 求大佬解答一下,谢谢!
>
>
>
>
>
>  转发邮件信息 
> 发件人:"casel.chen" 
> 发送日期:2021-05-08 11:50:03
> 收件人:"user-zh@flink.apache.org" 
> 主题:flink on native kubernetes要如何修改Logging配置?
> 我用native kubernetes方式部署flink session
> cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行
> (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session
> cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!
>
>
>
>
>
>
>


Re: History Server是否可以查看TaskManager聚合后的日志

2021-05-07 文章 Yang Wang
目前Flink的history server并没有和Yarn NM的log
aggregation进行整合,所以任务结束以后只能看webui以及exception
日志是没有办法看的

Best,
Yang

lhuiseu  于2021年5月7日周五 下午1:57写道:

> Hi:
> flink 1.12.0
> on yarn 模式
> 已经Finish的任务可以再history server中找到。但是通过WebUI查看TaskManager Log报404。目前Flink
> History Server是不支持查看TaskManager聚合后的日志吗?希望了解history serve相关原理的同学给予帮助。
> 非常感谢。
>
> 
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink kubernetes application频繁重启TaskManager问题

2021-04-05 文章 Yang Wang
你的cpu设置这么小,K8s是严格限制的

我怀疑TM启动很慢,一直注册不上来超时导致失败了,你可以看看TM log确认一下

另外,从你发的这个log看,rest endpoint应该已经成功启动了,可以通过来进行访问

Best,
Yang

casel.chen  于2021年4月5日周一 上午10:05写道:

> 最近试用flink kubernetes
> application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web
> ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么?
>
>
> = 启动参数
> "kubernetes.jobmanager.cpu": "0.1",
> "kubernetes.taskmanager.cpu": "0.1",
> "taskmanager.numberOfTaskSlots": "1",
> "jobmanager.memory.process.size": "1024m",
> "taskmanager.memory.process.size": "1024m",
>
>
> = k8s日志
>
>
>
> 2021-04-05 09:55:14,777 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] -
> JobManager successfully registered at ResourceManager, leader id:
> 9903e058fb5ca6f418c78dafcad048f1.
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,870 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Requesting new slot [SlotRequestId{3bcf44c03f742d211b5abcc9d0d35068}] and
> profile ResourceProfile{UNKNOWN} with allocation id
> 17bcd11a1d493155e3ed45cfd200be79 from resource manager.
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
>  with allocation id
> 17bcd11a1d493155e3ed45cfd200be79.
> 2021-04-05 09:55:14,974 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:55:15,272 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:55:18,570 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> new TaskManager pod with name
> flink-k8s-native-application-cluster-taskmanager-1-1 and resource
> <1024,0.1>.
> 2021-04-05 09:55:22,669 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> flink-k8s-native-application-cluster-taskmanager-1-1 is created.
> 2021-04-05 09:55:22,670 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: flink-k8s-native-application-cluster-taskmanager-1-1
> 2021-04-05 09:55:22,770 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker flink-k8s-native-application-cluster-taskmanager-1-1 with
> resource spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb
> (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb
> (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 with resource
> spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb (26843542
> bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes),
> managedMemSize=230.400mb (241591914 bytes)} was requested in current
> attempt and has not registered. Current pending count after removing: 0.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 is terminated.
> Diagnostics: Pod terminated, container termination statuses:
> [flink-task-manager(exitCode=1, reason=Error, message=null)]
> 2021-04-05 09:56:35,495 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:56:35,496 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:56:35,498 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> 

Re: flink-1.12.2 TM无法使用自定的serviceAccount访问configmap

2021-03-31 文章 Yang Wang
你只配置了JM的service account,-Dkubernetes.jobmanager
.service-account=flink-service-account

你试试改成-Dkubernetes.service-account=flink-service-account

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年3月31日周三 下午2:26写道:

> 您好, 这是TM的报错,
> <http://apache-flink.147419.n8.nabble.com/file/t1260/1617171749995.jpg>
>
> 启动的命令如下:
> ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=tuiwen-flink
> -Dtaskmanager.memory.process.size=2200m -Dkubernetes.taskmanager.cpu=0.3
> -Dkubernetes.jobmanager.cpu=0.3 -Dtaskmanager.numberOfTaskSlots=2
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.jobmanager.service-account=flink-service-account
> -Dresourcemanager.taskmanager-timeout=345600   -Dkubernetes.namespace=flink
>
> 镜像使我们根据: apache/flink:1.12.2-scala_2.12 自己做的。
>
>
>
> Yang Wang wrote
> > 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> >
> > Best,
> > Yang
> >
> > 1120344670 <
>
> > 1120344670@
>
> >> 于2021年3月29日周一 下午5:09写道:
> >
> >> 您好:
> >>之前提交过一个关于这方面的issue,链接如下:
> >>
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
> >>目前看还是没有fix对应的issue。
> >>
> >>报错如下:
> >>
> >>
> >> 目前看jira上的issue已经关闭了, 请确认是否修复。
> >>
>
>
> Yang Wang wrote
> > 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> >
> > Best,
> > Yang
> >
> > 1120344670 <
>
> > 1120344670@
>
> >> 于2021年3月29日周一 下午5:09写道:
> >
> >> 您好:
> >>之前提交过一个关于这方面的issue,链接如下:
> >>
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
> >>目前看还是没有fix对应的issue。
> >>
> >>报错如下:
> >>
> >>
> >> 目前看jira上的issue已经关闭了, 请确认是否修复。
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink Container exited with a non-zero exit code 2. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err

2021-03-29 文章 Yang Wang
这样的报错多半是Flink的JM/TM进程就没有被Yarn正常拉起,你查看Yarn的NodeManager日志
搜索对应的container,应该会有一些线索的

Best,
Yang

flink2021  于2021年3月30日周二 上午9:40写道:

> 实时作业运行一段时间后报错:
> Container exited with a non-zero exit code 2. Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err
> 具体原因是什么呢?有哪位大佬帮忙看看呢,日志中只有这条错误提示,其它没有。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-1.12.2 TM无法使用自定的serviceAccount访问configmap

2021-03-29 文章 Yang Wang
我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年3月29日周一 下午5:09写道:

> 您好:
>之前提交过一个关于这方面的issue,链接如下:
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
>目前看还是没有fix对应的issue。
>
>报错如下:
>
>
> 目前看jira上的issue已经关闭了, 请确认是否修复。
>


Re: Flink on Native K8S模式下如何配置StandBy做高可用?

2021-03-25 文章 Yang Wang
目前已经有了一个ticket来跟进了,https://issues.apache.org/jira/browse/FLINK-17707

应该在1.13里面可以支持

Best,
Yang

casel.chen  于2021年3月26日周五 上午8:23写道:

> Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!


Re: Flink Native Kubernetes 部署模式下,如何开启Queryable State?

2021-03-25 文章 Yang Wang
和Standalone一样,你可以按照自己创建一个taskmanager-query-state-service,然后把selector修改一下就好了
native会自动添加如下的label,可以filter出来属于一个Flink cluster的TaskManager

app: 
component: taskmanager
type: flink-native-kubernetes


Best,
Yang

tian lin  于2021年3月25日周四 下午4:43写道:

> 各位好:
> 请教Flink 1.12.1 在Flink Native Kubernets部署模式下,如何开启Queryable
> State呢?官网提供了Standaleon K8S下开启的说明(
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html#enabling-queryable-state),但Native
> K8S部署模式下,无论是Session还是Application 模式,Flink相关k8s
> spec基本都是由flink代码生成,尤其是Queryable State端口以及相关K8S Service没有非常便利的自动生成及部署办法。
>
> Sent from Mail for Windows 10
>
>


Re: flink 1.12.0 k8s session部署异常

2021-03-24 文章 Yang Wang
这个问题的根本原因是云上LoadBalancer一直在给Flink创建的service发送RST包导致了
这个JIRA[1]可以了解更多信息

临时绕过去的方案就是在log4j2配置里面把org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint这个类的log级别调到ERROR

[1]. https://issues.apache.org/jira/browse/FLINK-18129

Best,
Yang

18756225...@163.com <18756225...@163.com> 于2021年3月24日周三 下午5:45写道:

> 我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?
>
>
> 发件人: casel.chen
> 发送时间: 2021-02-07 16:33
> 收件人: user-zh@flink.apache.org
> 主题: flink 1.12.0 k8s session部署异常
> 在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
>
>
> 2021-02-07 08:21:41,873 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
> 2021-02-07 08:21:43,506 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-02-07 08:21:43,940 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> 

Re: Flink-1.12 OnYarn模式下HA咨询

2021-03-19 文章 Yang Wang
ZooKeeper HA的实现是独立于部署模式的,不仅Yarn可以用,Standalone、K8s也可以用

具体的文档可以参考社区[1]
如果想了解设计细节,可以看一下K8s HA的实现,原理上大同小异[2]

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/overview/
[2].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

Yapor <17379152...@163.com> 于2021年3月19日周五 下午2:13写道:

> Hi,各位社区的大佬,想了解下 flink 1.12 在yarn per
> job模式下HA的架构实现,是否有相关文档或图片描述呢?方便放出来,一起学习下!


Re: 如何动态配置 flink run 的 client日志文件的路径?

2021-03-04 文章 Yang Wang
你在运行flink run命令以前export一下FLINK_LOG_DIR应该就可以的

Best,
Yang

小旋锋  于2021年3月3日周三 下午12:12写道:

> Hi all.
> 通过flink run提交Flink作业,flink client产生的日志文件默认是在 $FLINK_HOME/log 下。
> 需要将每个作业提交产生的日志分别放到不同的目录下,那么请问如何动态指定每次flink run的日志文件的路径呢?
>
>
> 附:
> 1. 通过设置env.log.dir 配置项的值,在 flink-conf.yaml文件中会生效,但通过 -yD 或 -D
> 的方式动态指定的话,it doesn't seem to work
> 2. flink version: 1.10
>
>
> Thanks.


Re: Re: flink 1.12.2-rc2 被挖矿

2021-03-02 文章 Yang Wang
根本原因还是因为你把8081端口暴露在了公网上面,并且Flink的rest endpoint默认是没有鉴权的
所以可以任意提交jar进行运行

你应该用的是session模式吧,application模式默认是把web提交任务关闭了的


Best,
Yang

Michael Ran  于2021年3月3日周三 上午11:03写道:

> 网络层面 不会直接到公网才对,是开了什么吧?
> 在 2021-03-02 13:04:41,"macdoor"  写道:
> >我不是安全专家,不知道如何才能确认是 flink 的问题,但从现象看跟之前 flink 1.10
> >遇到的问题非常类似,建议你们能有这方面的测试用例,也能把测试结果提供出来
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错

2021-02-28 文章 Yang Wang
这个其实原因是阿里云的LoadBalancer探活机制不停的给Flink的rest endpoint发送RST导致的
目前有一个ticket来跟进这个问题[1],但还没有修复

短时间内你可以通过log4j的配置将org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
这个package的log level设置为WARN来暂时避免

[1]. https://issues.apache.org/jira/browse/FLINK-18129


Best,
Yang

王 羽凡  于2021年3月1日周一 下午1:01写道:

> 使用Flink1.12 Application Mode在阿里云托管Kubernetes
> ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。
> 但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么?
>
> 启动命令:
> ./bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=demo \
> -Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \
> local:///opt/flink/usrlib/my-flink-job.jar
>
> 日志:
> 2021-03-01 04:52:06,518 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 6eb4027586e7137b20ecc8c3ce624417 is submitted.
> 2021-03-01 04:52:06,518 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417.
> 2021-03-01 04:52:08,303 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
> 0 pods from previous attempts, current attempt id is 1.
> 2021-03-01 04:52:08,303 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
> 2021-03-01 04:52:08,306 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0
> was granted leadership with fencing token 
> 2021-03-01 04:52:08,310 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] -
> Starting the SlotManager.
> 2021-03-01 04:52:08,596 WARN
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Unhandled exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-03-01 04:52:08,596 WARN
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Unhandled exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> 

Re: Flink standalone模式如何区分各个任务的日志?

2021-02-22 文章 Yang Wang
Flink的standalone application模式[1]是可以每个app都单独记录日志的

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster

Best,
Yang

xingoo <23603...@qq.com> 于2021年2月22日周一 下午12:01写道:

> Hi,
>
> 这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on k8s日志时间戳时区问题

2021-02-20 文章 Yang Wang
也可以在build镜像的时候来进行设置


Best,
Yang

Michael Ran  于2021年2月19日周五 下午7:35写道:

> k8s  设置的
> 在 2021-02-19 09:37:28,"casel.chen"  写道:
> >目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
> >
> >
> >2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger
>  [] - Slf4jLogger started
> >2021-02-19 01:34:22,155 INFO  akka.remote.Remoting
>  [] - Starting remoting
> >2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] -
> Slf4jLogger started
> >2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting
>


Re: Flink standalone模式如何区分各个任务的日志?

2021-02-20 文章 Yang Wang
我理解你说的应该是standalone session,这种模式下一个TM上面是会跑不同job的task的

TM里面的框架日志都是混在一起的,如果你的job class是在不同的package下面
可以用log4j2针对不同的package设置不同的logger以及appender来输出到不同路径


Best,
Yang

xingoo <23603...@qq.com> 于2021年2月20日周六 下午5:31写道:

> Dear All:
> 目前Flink部署主要采用standalone,想了解下如何在同一个taskmanager区分各个job的日志。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 文章 Yang Wang
使用社区官方镜像flink:1.12.1,你需要配置如下参数
最后两个参数是通过环境变量的方式来enable oss的plugin

high-availability.storageDir: oss://flink/flink-ha
fs.oss.endpoint: 
fs.oss.accessKeyId: 
fs.oss.accessKeySecret: 
containerized.master.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar

Best,
Yang

casel.chen  于2021年2月17日周三 下午5:42写道:

>
> 如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。


Re: Flink standalone on k8s HA异常

2021-02-08 文章 Yang Wang
启用HA以后,你需要创建一个有create/watch ConfigMap的权限的service account
然后挂载给JobManager和TaskManager
从你的报错看应该是没有配置service account

Best,
Yang


casel.chen  于2021年2月9日周二 上午12:10写道:

> 我试着答k8s上部署flink
> standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?
>
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery
>
>
> 2021-02-09 00:03:04,421 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
> start cluster entrypoint StandaloneSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> services from the instantiated HighAvailabilityServicesFactory
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> ... 2 more
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:84)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory.fromConfiguration(DefaultKubeClientFactory.java:88)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:38)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-02-07 文章 Yang Wang
那你可能需要把你的JobManager和TaskManager的日志发一下,才能进一步分析

主要需要确认的是连的端口是正确的,如果网络层面没有问题,那就有可能是哪个配置项使用了某个特定端口导致的

Best,
Yang

Junpb  于2021年2月8日周一 上午9:30写道:

> 你好,
> 我的测试环境yarn有三个节点,当TM启动只有一个时,JM和Tm随机启动在任何节点上都很正常,只有TM变为两个时,会出现报错。
> 每次启动JM和TM端口都是随机的,以上配置是确保2个TM启动,我现在怀疑是我其他配置导致的错误,谢谢
>
> Best,
> Bi
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn 多TaskManager 拒绝连接问题

2021-02-06 文章 Yang Wang
建议你使用telnet检查一下JM和有问题TM之间的网络连通性,Flink在这个地方没有已知的bug


Best,
Yang

Junpb  于2021年2月5日周五 下午8:09写道:

> nohup bin/flink run -m yarn-cluster \
> -c main \
> -ynm ${FLINK_NAME} \
> -ys 3 \
> -p 4 \
> -yjm 2048m \
> -ytm 2048m \
>
> 在flink on yarn 的情况下,使用以上flink run 参数,确保TaskManager 为 2
>
> 奇怪的是 JobManager 里面报如下错误,但TaskManager的确启动2个,只是报错的那个TaskManager无法正常工作
>
> 谢谢解答
>
> 错误:
> Caused by:
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: ip:port
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-29 文章 Yang Wang
-m yarn-cluster和-t yarn-per-job都是可以用来提交per-job任务到Yarn集群的
只是背后实现的CLI不一样而已,前者FlinkYarnSessionCLI是以前的方式
后者是在1.10引入的一个更加通用的方式,可以和K8s、Standalone等保持一致

另外,还有一个差异是,-m yarn-cluster是可以支持-yq -ynm等这些CLI参数的
-t yarn-per-job只能通过-D的方式来设置


Best,
Yang

lp <973182...@qq.com> 于2021年1月29日周五 下午3:00写道:

> 应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化?
>
> 官网中的摘录如下:
>
> flink-1.11:
> Run a single Flink job on YARN
>
> Example:
> ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
>
> --
> flink-1.12:
> Per-Job Cluster Mode
>
> Example:
> ./bin/flink run -t yarn-per-job --detached
> ./examples/streaming/TopSpeedWindowing.jar
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-29 文章 Yang Wang
Thanks Xintong for driving this release.

Best,
Yang

Yu Li  于2021年1月29日周五 下午3:52写道:

> Thanks Xintong for being our release manager and everyone else who made
> the release possible!
>
> Best Regards,
> Yu
>
>
> On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
>> series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> https://flink.apache.org/news/2021/01/29/release-1.10.3.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Xintong Song
>>
>


Re: flink yarn application 提交任务出错

2021-01-20 文章 Yang Wang
这个报错应该是你HDFS的core-site.xml的配置有问题

defaultFS应该是hdfs://localhost:9000/才对,你可以检查一下

Best,
Yang

casel.chen  于2021年1月19日周二 下午6:19写道:

> 今天尝试使用yarn
> application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗?
>
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:443)
>
> at
> com.huifu.streamsql.launcher.yarn.YarnApplicationExecutor.doStart(YarnApplicationExecutor.java:70)
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:76)
>
> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:12)
>
> at com.huifu.streamsql.launcher.SubmitJobMain.main(SubmitJobMain.java:39)
>
> Caused by: java.lang.IllegalArgumentException: Wrong FS:
> hdfs://localhost:9000/flinkLib, expected: file:///
>
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:630)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625)
>
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1437)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.lambda$getAllFilesInProvidedLibDirs$2(YarnApplicationFileUploader.java:429)
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:94)
>
> at java.util.ArrayList.forEach(ArrayList.java:1259)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:426)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.(YarnApplicationFileUploader.java:109)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:354)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:710)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:436)
>
> ... 4 more
>
>
>
>


Re: flink yarn application提交作业问题

2021-01-20 文章 Yang Wang
目前user jar是可以支持远程,但是只能是hadoop compatiable的schema
因为远程的这个user jar并不会下载到Flink client本地,而是直接注册为Yarn的local resource来使用

所以你的这个报错是预期内的,还没有办法支持

Best,
Yang

casel.chen  于2021年1月20日周三 上午10:23写道:

> ./bin/flink run-application -t yarn-application \
>
>   -Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \
>
>   hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar
>
>
> 这种命令执行方式是可以执行的。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-20 10:21:32,"casel.chen"  写道:
> >使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将
> flink-oss-fs-hadoop-1.12.0.jar 放到flink
> lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗?
> >
> >
> >./bin/flink run-application -t yarn-application \
> >
> >  -Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \
> >
> >  oss://odps-prd/rtdp/flinkJobs/TopSpeedWindowing.jar
> >
> >
> >
> >
> >
> > The program finished with the following exception:
> >
> >
> >
> >
> >org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:443)
> >
> >at
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:207)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:974)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
> >
> >at java.security.AccessController.doPrivileged(Native Method)
> >
> >at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> >at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> >at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> >at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> >
> >Caused by: java.io.IOException: No FileSystem for scheme: oss
> >
> >at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799)
> >
> >at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
> >
> >at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
> >
> >at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
> >
> >at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
> >
> >at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
> >
> >at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
> >
> >at
> org.apache.flink.yarn.Utils.lambda$getQualifiedRemoteSharedPaths$1(Utils.java:577)
> >
> >at
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:127)
> >
> >at org.apache.flink.yarn.Utils.getRemoteSharedPaths(Utils.java:585)
> >
> >at
> org.apache.flink.yarn.Utils.getQualifiedRemoteSharedPaths(Utils.java:573)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:708)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:436)
> >
> >... 9 more
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-19 文章 Yang Wang
通过附件或者你上传到第三方的存储,然后在这里共享一下链接

macdoor  于2021年1月19日周二 下午12:44写道:

> 可以的,怎么发给你?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 文章 Yang Wang
Thanks Xintong for the great work as our release manager!


Best,
Yang

Xintong Song  于2021年1月19日周二 上午11:53写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Xintong
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
看着是有很多Connecting websocket 和 Scheduling reconnect task的log
我觉得还是你的Pod和APIServer的网络不是很稳定

另外,可以的话,你把DEBUG级别的JobManager完整log发一下

Best,
Yang

macdoor  于2021年1月19日周二 上午9:31写道:

> 多谢!打开了DEBUG日志,仍然只有最后一个ERROR,不过之前有不少包含
> kubernetes.client.dsl.internal.WatchConnectionManager  的日志,grep
> 了一部分,能看出些什么吗?
>
> job-debug-0118.log:2021-01-19 02:12:25,551 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:25,646 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
> job-debug-0118.log:2021-01-19 02:12:25,647 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:30,128 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5a9fa83e
> job-debug-0118.log:2021-01-19 02:12:30,176 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
> job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@15b15029
> job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2cdbe5a0
> job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@1e3f5396
> job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:12:42,677 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
> job-debug-0118.log:2021-01-19 02:12:42,678 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:42,920 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
> job-debug-0118.log:2021-01-19 02:12:42,921 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:45,130 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@4b318628
> job-debug-0118.log:2021-01-19 02:12:45,132 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
> job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@69d1ebd2
> job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
> 

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
具体你可以看一下YarnClusterDescriptor和YarnLogConfigUtil这两个类的代码
里面包含了如何来发现log4j的配置文件,以及如何来注册LocalResource,让Yarn来进行配置分发

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午11:17写道:

> 首先感谢提供解决方案。我回头就去试试。
>
>
> 关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
> on yarn 部署逻辑。
>
> thx.
>
>
> Yang Wang wrote
> > 在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件
> >
> > 但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
> > 在相应的目录下放自己的flink-conf.yaml和log4j.properties
> >
> > Best,
> > Yang
> >
> > Bobby <
>
> > 1010445050@
>
> >> 于2021年1月18日周一 下午7:18写道:
> >
> >> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> >> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> >> thx。
> >>
> >>
> >> Flink版本:1.9.1
> >> 部署方式:Flink on Yarn
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件

但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
在相应的目录下放自己的flink-conf.yaml和log4j.properties

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午7:18写道:

> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> thx。
>
>
> Flink版本:1.9.1
> 部署方式:Flink on Yarn
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
可以用iperf来进行网络的测试,你需要在镜像里面提前安装好

另外,可以打开debug log看一下是不是Watch经过了很多次重试都连不上,才导致失败的

Best,
Yang

macdoor  于2021年1月18日周一 下午7:08写道:

> 我查看了一下之前的日志,没有发现 too old resource
> version,而且连续几个日志都没有其他错误,直接就这个错误,restart,然后就是一个新日志了。
>
> 我用的k8s集群似乎网络确实不太稳定,请教一下如何测试Pod和APIServer之间的网络比较容易说明问题?ping?或者什么工具?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
你搜索一下看看有没有too old resource version的报错
另外,测试一下Pod和APIServer的网络状态,是不是经常断

Best,
Yang

macdoor  于2021年1月18日周一 上午9:45写道:

> 大约几十分钟就会restart,请教大佬们有查的思路,每次抛出的错误都是一样的,运行一段时间也会积累很多ConfigMap,下面是一个具体的错误
>
> 错误内容
>
> 2021-01-17 04:16:46,116 ERROR
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-01-17 04:16:46,117 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> 

Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2021-01-18 文章 Yang Wang
看着是Watch的时候报错了,你的K8s环境是怎么样的,如果Pod和K8s APIServer的网络状况不是很稳定会导致这个问题的

我这边在minikube和阿里云的ACK集群都做过测试,长时间运行(超过一周)并没有出现too old resource version等引起的JM重启

鉴于好几个人都反馈有这样的问题,会在1.12的下个bug fix(1.12.2)版本修复一下


Best,
Yang


macdoor  于2021年1月18日周一 上午9:45写道:

> 您好,我刚刚开始使用 flink 1.12.1 HA on
> k8s,发现jobmanager大约半小时左右会restart,都是这种错误,您遇到过吗?谢谢!
>
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Suspending
> SlotPool.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> [] - Close ResourceManager connection 28ed7c84e7f395c5a34880df91b251c6:
> Stopping JobMaster for job p_port_traffic_5m@hive->mysql @2021-01-17
> 11:40:00(67fb9b15d0deff998e287aa7e2cf1c7b)..
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
> SlotPool.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager
> 8c450d0051eff8c045adb76cb9ec4...@akka.tcp://flink@flink-jobmanager
> :6123/user/rpc/jobmanager_32
> for job 67fb9b15d0deff998e287aa7e2cf1c7b from the resource manager.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
> [] - Closing
>
> KubernetesLeaderElectionDriver{configMapName='test-flink-etl-67fb9b15d0deff998e287aa7e2cf1c7b-jobmanager-leader'}.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
> [] - The watcher is closing.
> 2021-01-17 04:52:12,416 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
> job graph 67fb9b15d0deff998e287aa7e2cf1c7b from
>
> KubernetesStateHandleStore{configMapName='test-flink-etl-dispatcher-leader'}.
> 2021-01-17 04:52:30,686 ERROR
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-01-17 04:52:30,691 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
> at

Re: flink native k8s 有计划支持hostAlias配置码?

2021-01-18 文章 Yang Wang
目前对于一些不是经常使用的功能,社区打算使用pod template来统一支持
我理解应该是可以满足你的需求的
这样更加灵活,也会有更好的扩展性,具体你可以看一下这个JIRA[1]

已经有了一个draft的PR,会很快在完成后提交正式PR,然后review
你也可以先试用一下,有问题及时反馈

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang

高函  于2021年1月18日周一 上午11:13写道:

>
>
> 请问社区有计划支持native k8s模式下配置hostAlais码?
> 如果自己扩展的话,需要在模块中添加对应的hostAlais的配置项,并打包自定义的docker 镜像码?
> 谢谢~
>


Re: flink1.12 k8s session部署,TM无法启动

2021-01-13 文章 Yang Wang
这个问题是在1.12.1中修复的,1.12.0里面还不能支持给TM设置ServiceAccount
具体可以看下这个ticket,https://issues.apache.org/jira/browse/FLINK-20664

另外,1.12.1正在投票,最近就会发布

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年1月13日周三 下午5:17写道:

> *flink版本: 1.12*
> *kubernetes:  1.17*
> TM无法启动,  报错如下:
>
>
> 从报错来看,TM尝试访问统一namespace下的configmap出现了权限问题, 使用的是 system:serviceaccount:
> flink-test:default 这个角色。   在启动flink的时候我已经设置了 "taskmanager.service-account" ,
> "jobmanager.service-account", "kubernetes.service-account"
> 这三个参数都是我们自定的service account. 看起来是没有生效或者可用。
>
> 处理: 为default账号创建一个可以查看的角色即可。
>
> kubectl create clusterrolebinding flink-role-binding-flink-defalut 
> --clusterrole=edit --serviceaccount=namespace:service-account.
>
>
>
>


Re: flink1.12.0 native k8s启动不了

2021-01-10 文章 Yang Wang
这个问题的根本原因是你Client端用的是1.12版本,但是你build的镜像的基础镜像是1.11的,因为1.12的镜像还没有发布到docker
hub上
你用正确的Dockerfile[1]自己重新build一个,再运行一下看看

[1].
https://github.com/apache/flink-docker/tree/master/1.12/scala_2.12-java8-debian

Best,
Yang

yzxs  于2021年1月8日周五 下午12:16写道:

> 1、使用以下命令发布任务:
> ./bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=my-first-application-cluster \
>
> -Dkubernetes.container.image=
> registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
> \
> -Dkubernetes.container.image.pull-policy=Always \
> -Dkubernetes.container-start-command-template="%java% %classpath%
> %jvmmem% %jvmopts% %logging% %class% %args%" \
> local:///opt/flink/usrlib/WordCount.jar
>
> 2、任务发布后,pod重启失败,用kubectl logs查看日志,出现以下错误:
> /docker-entrypoint.sh: 125: exec: native-k8s: not found
>
>
> 3、检查了镜像的docker-entrypoint.sh脚本,没有navive-k8s的命令,镜像是基于flink最新的镜像进行构筑的,dockerfile如下:
> FROM flink:latest
> RUN mkdir -p /opt/flink/usrlib
> COPY ./WordCount.jar /opt/flink/usrlib/WordCount.jar
>
> 3、pod的describe信息
> Name: my-first-application-cluster-59c4445df4-4ss2m
> Namespace:default
> Priority: 0
> Node: minikube/192.168.64.2
> Start Time:   Wed, 23 Dec 2020 17:06:02 +0800
> Labels:   app=my-first-application-cluster
>   component=jobmanager
>   pod-template-hash=59c4445df4
>   type=flink-native-kubernetes
> Annotations:  
> Status:   Running
> IP:   172.17.0.3
> IPs:
>   IP:   172.17.0.3
> Controlled By:  ReplicaSet/my-first-application-cluster-59c4445df4
> Containers:
>   flink-job-manager:
> Container ID:
> docker://b8e5759488af5fd3e3273f69d42890d9750d430cbd6e18b1d024ab83293d0124
> Image: registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
> Image ID:
> docker-pullable://
> registry.cn-shenzhen.aliyuncs.com/syni_test/flink@sha256:53a2cec0d0a532aa5d79c241acfdd13accb9df78eb951eb4e878485174186aa8
> Ports: 8081/TCP, 6123/TCP, 6124/TCP
> Host Ports:0/TCP, 0/TCP, 0/TCP
> Command:
>   /docker-entrypoint.sh
> Args:
>   native-k8s
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
>
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=201326592b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=1073741824b -D
> jobmanager.memory.jvm-overhead.max=201326592b
> State:  Waiting
>   Reason:   CrashLoopBackOff
> Last State: Terminated
>   Reason:   Error
>   Exit Code:127
>   Started:  Wed, 23 Dec 2020 17:37:28 +0800
>   Finished: Wed, 23 Dec 2020 17:37:28 +0800
> Ready:  False
> Restart Count:  11
> Limits:
>   cpu: 1
>   memory:  1600Mi
> Requests:
>   cpu: 1
>   memory:  1600Mi
> Environment:
>   _POD_IP_ADDRESS:   (v1:status.podIP)
> Mounts:
>   /opt/flink/conf from flink-config-volume (rw)
>   /var/run/secrets/kubernetes.io/serviceaccount from
> default-token-9hdqt
> (ro)
> Conditions:
>   Type  Status
>   Initialized   True
>   Ready False
>   ContainersReady   False
>   PodScheduled  True
> Volumes:
>   flink-config-volume:
> Type:  ConfigMap (a volume populated by a ConfigMap)
> Name:  flink-config-my-first-application-cluster
> Optional:  false
>   default-token-9hdqt:
> Type:Secret (a volume populated by a Secret)
> SecretName:  default-token-9hdqt
> Optional:false
> QoS Class:   Guaranteed
> Node-Selectors:  
> Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
>  node.kubernetes.io/unreachable:NoExecute op=Exists for
> 300s
> Events:
>   Type Reason Age  From   Message
>    --     ---
>   Normal   Scheduled  15d  default-scheduler  Successfully
> assigned default/my-first-application-cluster-59c4445df4-4ss2m to minikube
>   Normal   Pulled 15d  kubeletSuccessfully
> pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
> 513.7913ms
>   Normal   Pulled 15d  kubeletSuccessfully
> pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
> 374.1125ms
>   Normal   Pulled 15d  kubeletSuccessfully
> pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
> 360.6719ms
>   Normal   Created15d (x4 over 15d)

Re: Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-04 文章 Yang Wang
1.11版本以后可以直接在Flink Client的机器上export HADOOP_CONF_DIR
然后运行flink run-application或者kubernetes_session.sh启动Flink任务,这样Flink
Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod
并且加到classpath的

Best,
Yang

龙逸尘  于2021年1月4日周一 下午4:39写道:

> 各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
> demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。
>
> Dockerfile 如下
>
> FROM flink:1.11.3-scala_2.11
> RUN mkdir -p $FLINK_HOME/usrlib
> RUN mkdir -p /opt/hadoop/conf
> COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> COPY flink-on-k8s-1.0-SNAPSHOT.jar
> $FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
> COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
> ENV HADOOP_CONF_DIR /opt/hadoop/conf
> ENV YARN_CONF_DIR /opt/hadoop/conf
> COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
> COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
> COPY core-site.xml /opt/hadoop/conf/core-site.xml
>
> 启动命令如下
>
> flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
> -Dkubernetes.jobmanager.service-account=flink
> -Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
> -Dtaskmanager.numberOfTaskSlots=1
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
> -Dkubernetes.container.image=flink:demo7-4
> -Dkubernetes.rest-service.exposed.type=NodePort
> local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
>
> flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
> 记录状态。
>
> 一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
> UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
> TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
> pod 获取 tm 报错信息如下:
>
> Events:
>   Type Reason   Age   From
>  Message
>    --     
>  ---
>   Normal   Scheduled default-scheduler
> Successfully assigned
> default/my-first-application-cluster-demo7-4-taskmanager-1-1 to
> k8s-node0002
>   Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
> k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
> "hadoop-config-volume" : configmap
> "hadoop-config-my-first-application-cluster-demo7-4" not found
>   Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002  Unable
> to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
> unattached volumes=[hadoop-config-volume flink-config-volume
> default-token-fhkhf]: timed out waiting for the condition
>
> 请问我是否配置有误,还是需要别的配置来启用 hdfs。
> 期待您的回复~
>
> ---
> Best Regards!
>
> Yichen
>


Re: flink on k8s application mode指定运行作业jar包路径问题

2021-01-04 文章 Yang Wang
目前native的方式只能支持local,也就是用户jar需要打到镜像里面,暂时不能支持hdfs或oss

是可以通过init container来下载,目前pod template[1]这个功能还没有支持,你可以跟进进度

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang

陈帅  于2021年1月2日周六 下午8:08写道:

> 官网给的示例命令如下
> ./bin/flink run-application -p 8 -t kubernetes-application \
>
>   -Dkubernetes.cluster-id=flink-k8s-application-cluster \
>
>   -Dtaskmanager.memory.process.size=4096m \
>
>   -Dkubernetes.taskmanager.cpu=2 \
>
>   -Dtaskmanager.numberOfTaskSlots=4 \
>
>   -Dkubernetes.container.image=flink:latest \
>
>   local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
>
>
>
> 这最后一行参数指定了作业jar包路径,请问只支持local模式吗?那这样的话就只能将作业打包进镜像了吧?有没有可能访问外部文件系统,例如hdfs或oss地址?
> 如果当前暂时不支持的话,还有别的workaround办法吗?我听说有一个init container,具体要如何操作呢?


Re: flink on k8s作业监控问题

2021-01-04 文章 Yang Wang
可以通过配置PrometheusPushGateway[1]将Metrics导入到Prometheus,然后对接Grafana进行查看

日志除了sidecar,也可以通过log4j2自定义appender的方式,来直接将Log写到分布式存储(阿里云Oss,ElasticSearch)等

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/metric_reporters.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

Best,
Yang

陈帅  于2021年1月2日周六 下午7:54写道:

> 请问运行在k8s per
> job上的flink作业要如何正确监控?一方面通过sidecar的方式收集日志,另一方面要怎么收集那些flink作业metrics?
> 这方面有什么资料参考吗?


Re: Re: flink 1.12.0 kubernetes-session部署问题

2021-01-04 文章 Yang Wang
native方式默认使用的是LoadBalancer的方式来暴露,所以会打印出来一个你无法访问的地址
你可以加一个-Dkubernetes.rest-service.exposed.type=NodePort的方式来使用NodePort来暴露
这样Flink Client端打印出来的地址就是正确的了

另外你可以可以使用minikube ip来查看ip地址,同时用kubectl get svc获取你创建的Flink cluster
svc的NodePort,拼起来就可以


至于你说的NoResourceAvailableException,你可以看下是不是TaskManager的Pod已经创建出来了,但是pending状态
如果是,那就是你minikube资源不够了,可以把minikube资源调大或者把JobManager、TaskManager的Pod资源调小
如果不是,你可以把完整的JobManager日志发一下,这样方便查问题


Best,
Yang

陈帅  于2021年1月2日周六 上午10:43写道:

> 环境:MacBook Pro 单机安装了 minkube v1.15.1 和 kubernetes v1.19.4
> 我在flink v1.11.3发行版下执行如下命令
> kubectl create namespace flink-session-cluster
>
>
> kubectl create serviceaccount flink -n flink-session-cluster
>
>
> kubectl create clusterrolebinding flink-role-binding-flink \
> --clusterrole=edit \ --serviceaccount=flink-session-cluster:flink
>
>
> ./bin/kubernetes-session.sh \ -Dkubernetes.namespace=flink-session-cluster
> \ -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=session001 \
> -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=1 \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dresourcemanager.taskmanager-timeout=360
>
>
> 屏幕打印的结果显示flink web UI启在了 http://192.168.64.2:8081 而不是类似于
> http://192.168.50.135:31753 这样的5位数端口,是哪里有问题?这里的host ip应该是minikube
> ip吗?我本地浏览器访问不了http://192.168.64.2:8081
>
>
>
> 2021-01-02 10:28:04,177 INFO
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
>
> 2021-01-02 10:28:04,907 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.64.2:8081
>
>
>
>
> 查看了pods, service, deployment都正常启动好了,显示全绿色的
>
>
> 接下来提交任务
> ./bin/flink run -d \ -e kubernetes-session \
> -Dkubernetes.namespace=flink-session-cluster \
> -Dkubernetes.cluster-id=session001 \ examples/streaming/WindowJoin.jar
>
>
>
> Using windowSize=2000, data rate=3
>
> To customize example, use: WindowJoin [--windowSize
> ] [--rate ]
>
> 2021-01-02 10:21:48,658 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve
> flink cluster session001 successfully, JobManager Web Interface:
> http://10.106.136.236:8081
>
>
>
>
> 这里显示的 http://10.106.136.236:8081 我是能够通过浏览器访问到的,打开显示作业正在运行,而且available
> slots一项显示的是 0,查看JM日志有如下error
>
>
>
>
> Causedby:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Couldnot allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.12-1.11.3.jar:1.11.3]
> ... 47 more
> Causedby: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> ~[?:1.8.0_275]
>     at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> ~[?:1.8.0_275]
> ... 27 more
> Causedby: java.util.concurrent.TimeoutException
> ... 25 more
>
>
> 为什么会报这个资源配置不足的错?谢谢解答!
>
>
>
>
>
>
>
>
> 在 2020-12-29 09:53:48,"Yang Wang"  写道:
> >ConfigMap不需要提前创建,那个Warning信息可以忽略,是正常的,主要原因是先创建的deployment,再创建的ConfigMap
> >你可以参考社区的文档[1]把Jm的log打到console看一下
> >
> >我怀疑是你没有创建service account导致的[2]
> >
> >[1].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
> >[2].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac
> >
> >Best,
> >Yang
> >
> >陈帅  于2020年12月28日周一 下午5:54写道:
> >
> >> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
> >> 这是我的命令
> >> ./bin/kubernetes-session.sh \
> >>   -Dkubernetes.cluster-id=rtdp \
> >>   -Dtaskmanager.memory.process.size=4096m \
> >>   -Dkubernetes.taskmanager.cpu=2 \
> >>   -Dtaskmanager.numberOfTaskSlots=4 \
> >>   -Dresourcemanager.taskmanager-timeout=360 \
> >>   -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
> >>   -Dkubernetes.namespace=rtdp
> >>
&g

Re: flink 1.12.0 kubernetes-session部署问题

2020-12-28 文章 Yang Wang
ConfigMap不需要提前创建,那个Warning信息可以忽略,是正常的,主要原因是先创建的deployment,再创建的ConfigMap
你可以参考社区的文档[1]把Jm的log打到console看一下

我怀疑是你没有创建service account导致的[2]

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac

Best,
Yang

陈帅  于2020年12月28日周一 下午5:54写道:

> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
> 这是我的命令
> ./bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=rtdp \
>   -Dtaskmanager.memory.process.size=4096m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=4 \
>   -Dresourcemanager.taskmanager-timeout=360 \
>   -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
>   -Dkubernetes.namespace=rtdp
>
>
>
> Events:
>
>   Type Reason  AgeFrom   Message
>
>    --        ---
>
>   Normal   Scheduled   88sdefault-scheduler
> Successfully assigned rtdp/rtdp-6d7794d65d-g6mb5 to
> cn-shanghai.192.168.16.130
>
>   Warning  FailedMount 88skubelet
> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-rtdp" not found
>
>   Warning  FailedMount 88skubelet
> MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap
> "hadoop-config-rtdp" not found
>
>   Normal   AllocIPSucceed  87sterway-daemon  Alloc IP
> 192.168.32.25/22 for Pod
>
>   Normal   Pulling 87skubeletPulling
> image "flink:1.11.3-scala_2.12-java8"
>
>   Normal   Pulled  31skubelet
> Successfully pulled image "flink:1.11.3-scala_2.12-java8"
>
>   Normal   Created 18s (x2 over 26s)  kubeletCreated
> container flink-job-manager
>
>   Normal   Started 18s (x2 over 26s)  kubeletStarted
> container flink-job-manager
>
>   Normal   Pulled  18skubeletContainer
> image "flink:1.11.3-scala_2.12-java8" already present on machine
>
>   Warning  BackOff 10skubeletBack-off
> restarting failed container
>
>
>
>
>
>
>
> 这里面有两个ConfigMap没有找到,是需要提前创建吗?官方文档没有说明?还是我看漏了?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-session
>
>
>
>
>
>
>
>
>
> 在 2020-12-27 22:50:32,"陈帅"  写道:
>
> >本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
> >
> >
> >git clone
> https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
> >docker build --tag flink:1.12.0-scala_2.12-java8 .
> >
> >
> >cd flink-1.12.0
> >./bin/kubernetes-session.sh \
> -Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dtaskmanager.numberOfTaskSlots=2 \
> -Dkubernetes.cluster-id=flink-session-cluster
> >
> >
> >显示JM启起来了,但无法通过web访问
> >
> >2020-12-27 22:08:12,387 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.99.100:8081
> >
> >
> >
> >
> >通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态
> >
> >NAME   READY   STATUS
>   RESTARTS   AGE
> >
> >flink-session-cluster-858bd55dff-bzjk2 0/1
>  ContainerCreating   0  5m59s
> >
> >kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running
>  0  6d14h
> >
> >
> >
> >
> >于是通过 `kubectl describe pod
> flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:
> >
> >
> >
> >
> >Name: flink-session-cluster-858bd55dff-bzjk2
> >
> >Namespace:default
> >
> >Priority: 0
> >
> >Node: minikube/192.168.99.100
> >
> >Start Time:   Sun, 27 Dec 2020 22:21:56 +0800
> >
> >Labels:   app=flink-session-cluster
> >
> >  component=jobmanager
> >
> >  pod-template-hash=858bd55dff
> >
> >  type=flink-native-kubernetes
> >
> >Annotations:  
> >
> >Status:   Pending
> >
> >IP:   172.17.0.4
> >
> >IPs:
> >
> >  IP:   172.17.0.4
> >
> >Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff
> >
> >Containers:
> >
> >  flink-job-manager:
> >
> >Container ID:
> >
> >Image: flink:1.12.0-scala_2.12-java8
> >
> >Image ID:
> >
> >Ports: 8081/TCP, 6123/TCP, 6124/TCP
> >
> >Host Ports:0/TCP, 0/TCP, 0/TCP
> >
> >Command:
> >
> >  /docker-entrypoint.sh
> >
> >Args:
> >
> >  native-k8s
> >
> >  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> 

Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2020-12-27 文章 Yang Wang
感谢使用K8s的HA mode,你用的是Session模式还是Application模式

* 如果是Application模式,那在flink job达到terminal state(FAILED, CANCELED,
SUCCEED)时会自动清理掉所有HA相关的ConfigMap,你可以在webui上面cancel任务或者用flink
cancel,然后观察一下,应该不会有残留的
*
如果是Session模式,你提交了很多任务,每个job都会对应一个ConfigMap的,这个ConfigMap的内容会在任务结束以后清理,但ConfigMap还存在,已经有一个ticket[1]来跟进Session模式下改进清理的过程,目前你可以在Session确认不使用的情况下用命令kubectl
delete cm --selector='app=,configmap-type=high-availability'来清理

[1]. https://issues.apache.org/jira/browse/FLINK-20219

Best,
Yang

tao7 <99727...@qq.com> 于2020年12月28日周一 上午10:26写道:

> 大家好,我使用native k8s方式部署flink1.12
>
> HA到k8s一段时间后,jobmanager-leader产生了大量的ConfigMap,这些ConfigMaps都是正常需要的吗?大家都是如何清理和维护的呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.0 kubernetes-session部署问题

2020-12-27 文章 Yang Wang
你整个流程理由有两个问题:

1. 镜像找不到
原因应该是和minikube的driver设置有关,如果是hyperkit或者其他vm的方式,你需要minikube
ssh到虚拟机内部查看镜像是否正常存在

2. JM链接无法访问
2020-12-27 22:08:12,387 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
flink session cluster session001 successfully, JobManager Web Interface:
http://192.168.99.100:8081

我猜你上面的这行log应该不是你贴出来的命令打印的,因为你给的命令是NodePort方式,打印出来的JM地址不应该是8081端口的。
只要你在minikube上提交的任务加上kubernetes.rest-service.exposed.type=NodePort,并且JM能起来,打印出来的JM地址就是可以访问的

当然你也可以手动拼接出来这个链接,minikube ip拿到APIServer地址,然后用kubectl get svc 去查看你创建的Flink
Session Cluster对应的rest svc的NodePort,拼起来访问就好了


Best,
Yang

陈帅  于2020年12月27日周日 下午10:51写道:

>
> 本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
>
>
> git clone
> https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
> docker build --tag flink:1.12.0-scala_2.12-java8 .
>
>
> cd flink-1.12.0
> ./bin/kubernetes-session.sh \
> -Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dtaskmanager.numberOfTaskSlots=2 \
> -Dkubernetes.cluster-id=flink-session-cluster
>
>
> 显示JM启起来了,但无法通过web访问
>
> 2020-12-27 22:08:12,387 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.99.100:8081
>
>
>
>
> 通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态
>
> NAME   READY   STATUS
> RESTARTS   AGE
>
> flink-session-cluster-858bd55dff-bzjk2 0/1
>  ContainerCreating   0  5m59s
>
> kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running
>  0  6d14h
>
>
>
>
> 于是通过 `kubectl describe pod
> flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:
>
>
>
>
> Name: flink-session-cluster-858bd55dff-bzjk2
>
> Namespace:default
>
> Priority: 0
>
> Node: minikube/192.168.99.100
>
> Start Time:   Sun, 27 Dec 2020 22:21:56 +0800
>
> Labels:   app=flink-session-cluster
>
>   component=jobmanager
>
>   pod-template-hash=858bd55dff
>
>   type=flink-native-kubernetes
>
> Annotations:  
>
> Status:   Pending
>
> IP:   172.17.0.4
>
> IPs:
>
>   IP:   172.17.0.4
>
> Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff
>
> Containers:
>
>   flink-job-manager:
>
> Container ID:
>
> Image: flink:1.12.0-scala_2.12-java8
>
> Image ID:
>
> Ports: 8081/TCP, 6123/TCP, 6124/TCP
>
> Host Ports:0/TCP, 0/TCP, 0/TCP
>
> Command:
>
>   /docker-entrypoint.sh
>
> Args:
>
>   native-k8s
>
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=201326592b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=1073741824b -D
> jobmanager.memory.jvm-overhead.max=201326592b
>
> State:  Waiting
>
>   Reason:   ImagePullBackOff
>
> Ready:  False
>
> Restart Count:  0
>
> Limits:
>
>   cpu: 1
>
>   memory:  1600Mi
>
> Requests:
>
>   cpu: 1
>
>   memory:  1600Mi
>
> Environment:
>
>   _POD_IP_ADDRESS:   (v1:status.podIP)
>
>   HADOOP_CONF_DIR:  /opt/hadoop/conf
>
> Mounts:
>
>   /opt/flink/conf from flink-config-volume (rw)
>
>   /opt/hadoop/conf from hadoop-config-volume (rw)
>
>   /var/run/secrets/kubernetes.io/serviceaccount from
> default-token-s47ht (ro)
>
> Conditions:
>
>   Type  Status
>
>   Initialized   True
>
>   Ready False
>
>   ContainersReady   False
>
>   PodScheduled  True
>
> Volumes:
>
>   hadoop-config-volume:
>
> Type:  ConfigMap (a volume populated by a ConfigMap)
>
> Name:  hadoop-config-flink-session-cluster
>
> Optional:  false
>
>   flink-config-volume:
>
> Type:  ConfigMap (a volume populated by a ConfigMap)
>
> Name:  flink-config-flink-session-cluster
>
> Optional:  false
>
>   default-token-s47ht:
>
> Type:Secret (a volume populated by a Secret)
>
> SecretName:  default-token-s47ht
>
> Optional:false
>
> QoS Class:   Guaranteed
>
> Node-Selectors:  
>
> Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
>
>  node.kubernetes.io/unreachable:NoExecute op=Exists for
> 300s
>
> Events:
>
>   Type Reason   Age  From   Message
>
>    --   

Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 Yang Wang
非常不建议你将非Flink binary的jar存放到yarn.provided.lib.dirs,因为这个下面的jar会以Yarn public
distributed cache的方式进行分发
并在NodeManager上缓存,共享给所有的application使用

你这个报错的根本原因是本地运行main的时候udf还是在hdfs上,所以报错在client端了

有两个办法修复:
1. 不要将udf放到hdfs上的provided lib dirs,除非你确实想将它共享给很多application
2.
使用application模式[1],这种情况用户的main是在JobManager端运行的,provided下面的jar已经都下载并且加入classpath了

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

zhisheng  于2020年12月25日周五 上午11:26写道:

> hi
>
> 使用 -Dyarn.provided.lib.dirs 试试
>
> Best
> zhisheng
>
> datayangl  于2020年12月22日周二 下午4:56写道:
>
> >
> >
> > flink1.11 on yarn模式,我提前将flink
> > lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> > 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
> >
> > 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test
> -yD
> > yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> > com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
> >
> > 相关信息如下:
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > -- class path:
> /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> >
> > 
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: object com.ly.third.udf.flink.SortKey not found.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: scala.ScalaReflectionException: object
> > com.ly.third.udf.flink.SortKey not found.
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> > at
> >
> >
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> > at
> com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> > at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: flink on yarn启动失败

2020-12-23 文章 Yang Wang
你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
traffic.py

应该是-ys,而不是-s
-s是从savepoints恢复,所以报错里面会有找不到savepoints目录


Best,
Yang

magichuang  于2020年12月23日周三 下午8:29写道:

> 机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
> flink版本:1.11.2,在三台集群上搭建的集群
>
> hadoop集群是用cdh搭建的
>
>
> 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
>
>
>
>
> 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
>
> 测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar
>  是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
>
>
>
>
> 下面是主要报错信息
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> savepoint file/directory '2' on file system 'file'.
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> 2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] -
> Stopped BLOB server at 0.0.0.0:16109
>
>
>
>
>
>
>
> 全部日志可以打开下面的链接:
> https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note
> 

Re: taskmanager.out配置滚动

2020-12-22 文章 Yang Wang
之前在社区我提过一次redirect的方案,但其他人有一些concerns,可以参考一下

https://github.com/apache/flink/pull/11839#pullrequestreview-399769862

zilong xiao  于2020年12月22日周二 下午4:13写道:

> 恩恩,这个场景是有的,目前看是可以通过重定向后实现,follow issue~
>
> 李杰  于2020年12月22日周二 下午3:58写道:
>
> > Hi,
> > 这个功能我们之前做过,可以看下这里。
> > https://issues.apache.org/jira/browse/FLINK-20713
> >
> > zilong xiao  于2020年12月3日周四 下午7:50写道:
> >
> > > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> > >
> >
>


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-22 文章 Yang Wang
在Yarn上将需要给Flink使用的机器单独划分到一个partition里面,使用node label可以实现

然后在提交Flink任务的时候使用yarn.application.node-label来指定就可以了。partition是可以保证排他的,不带这个label的调度不上来

Best,
Yang

r pp  于2020年12月23日周三 上午11:18写道:

> flink 提交到特定的node ,可以保证 其它的任务 不能提交到flink特定的node 上么?
>
> xiao cai  于2020年12月22日周二 上午10:28写道:
>
> > Hi
> > 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上
> >
> >
> >  Original Message
> > Sender: r pp
> > Recipient: user-zh
> > Date: Monday, Dec 21, 2020 21:25
> > Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
> >
> >
> > 嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? <
> > afweij...@163.com> 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > >
> > -邮件原件- > 发件人: user-zh-return-10095-afweijian=
> > 163@flink.apache.org >  > 163@flink.apache.org> 代表 yujianbo > 发送时间: 2020年12月21日 16:44 > 收件人:
> > user-zh@flink.apache.org > 主题: Flink on yarn
> 如何指定固定几台yarn节点当做flink任务的运行节点
> > > > 各位大佬好: > 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- >
> > Sent from: http://apache-flink.147419.n8.nabble.com/ >
>


  1   2   3   >