Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。
Hi, Young 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。 也许你可以创建一个 jira issue 来跟进这个问题 Best, Weihua On Thu, Oct 27, 2022 at 6:51 PM Young Chen wrote: > 【问题描述】 > > Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), > 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 > > 可以看到容器中如下error日志。 > > > > 【操作步骤】 > > 部署Cluster > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkDeployment > > metadata: > > name: flink-cluster-1jm-checkpoint > > spec: > > image: flink:1.15 > > flinkVersion: v1_15 > > flinkConfiguration: > > taskmanager.numberOfTaskSlots: "1" > > state.savepoints.dir: > file:///flink-data/savepoints > > state.checkpoints.dir: > file:///flink-data/checkpoints > > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > > high-availability.storageDir: > file:///flink-data/ha > > state.checkpoints.num-retained: "10" > > serviceAccount: flink > > ingress: > > template: "{{name}}.{{namespace}}.k8s.rf.io" > > jobManager: > > replicas: 2 > > podTemplate: > > spec: > > nodeSelector: > > kubernetes.io/hostname: k8s17 > > containers: > > - name: flink-main-container > > volumeMounts: > > - mountPath: /flink-data > > name: flink-volume > > volumes: > > - name: flink-volume > > hostPath: > > # directory location on host > > path: /tmp/flink > > # this field is optional > > type: Directory > > > > 部署job: > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkSessionJob > > metadata: > > name: flink-job-1jm-checkpoint > > spec: > > deploymentName: flink-cluster-1jm-checkpoint > > job: > > jarURI: > file:///opt/flink/examples/streaming/StateMachineExample.jar > # 自己打的operator镜像包含了examples的jar > > entryClass: > org.apache.flink.streaming.examples.statemachine.StateMachineExample > > parallelism: 1 > > upgradeMode: savepoint > > > > > > 【相关日志】 > > 1. job部署成功可以运行的一次,operator日志: > > 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService > [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. > > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: File > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist > in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. > > at > org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > > at java.base/java.lang.Thread.run(Unknown Source) > > ] > > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source > > 一个JobManager > Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。 > > > > > > 1. job部署失败operator日志: > > 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher > [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing > ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', > namespace='flink'}, version: 120505701} failed. > > org.apache.flink.kubernetes.operator.exception.ReconciliationException: > org.apache.flink.util.FlinkRuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., > java.util.concurrent.CompletionException: > org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file > /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar > does not exist > > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172) > > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141) > > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100) > > at >
Re: flink web ui cancel job时能否指定要不要生成savepoint?
hi casel, 目前web ui上应该不支持触发savepoint。 如果要使用stop-with-savepoint功能的话, 可以通过bin/flink[1]或者rest api[2]的方式。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop 李晋忠 casel.chen 于2022年10月28日周五 09:41写道: > flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without > savepoint的。
flink web ui cancel job时能否指定要不要生成savepoint?
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。
Re: flinksql 维表join
Hi, Flink 的 lookup join 目前不支持对维表进行预处理, 并且需要有对维表原始字段的等值连接条件(因为需要通过确定的字段值去查找) 示例中 t4 字段不做计算应该是 work 的, 比如 udf(t1.telephone_no) = t4.de_mobile Best, Lincoln Lee Fei Han 于2022年10月27日周四 12:12写道: > 大家好!请教几个问题 > 1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join > 例如:临时表 > WITH employee_tmp AS( > select > userid as userid, > name as name, > mobile as de_mobile > from ygp_dwd_catalog.flink_dwd.employee > ) > select > * > from ( select > * > from ygp_dwd_catalog.flink_dwd.xxx ) t1 > left join employee_tmp FOR SYSTEM_TIME AS OF . > 以上这样写对不对,测试的时候发现是null。 > 2.维表FOR SYSTEM_TIME AS OF 后面关联条件能否如下方式写: > FOR SYSTEM_TIME AS OF t1.proc_time as t4 on udf(t1.telephone_no) = > udf(t4.de_mobile) > 上面写法报错如下: > Temporal table join requires an equality condition on fields of table。 > 但问题的关键是UDF在hive和presto都可以跑起来的。FlinkSQL就报了相同的条件要 >
Re: 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?
Hi, LiTing 这是因为 flink-operator 默认配置中有以下两项默认值。 > taskmanager.numberOfTaskSlots: 2 > parallelism.default: 2 你可以在作业的 yaml 的 flinkConfiguration 中覆盖这两个默认配置 Best, Weihua On Thu, Oct 27, 2022 at 9:12 AM Jason_H wrote: > hi,Liting Liu > > 看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。 > 可以参考一下: > https://blog.csdn.net/sinat_38079265/article/details/108535909 > > > > | | > Jason_H > | > | > hyb_he...@163.com > | > 回复的原邮件 > | 发件人 | Liting Liu (litiliu) | > | 发送日期 | 2022年10月26日 13:19 | > | 收件人 | user-zh | > | 主题 | 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题? | > hi: > 我尝试使用flink-operator 1.2.0 用如下yaml 创建一个任务(该yaml > 中并未设置taskmanager.numberOfTaskSlots). 遇到了一个问题。 可以稳定复现。 > > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: basic-example > spec: > image: flink:1.15 > flinkVersion: v1_15 > flinkConfiguration: > serviceAccount: flink > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > job: > jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar > parallelism: 2 > upgradeMode: stateless > > 但是在生成的configMap (flink-config-basic-example)中, > 发现“taskmanager.numberOfTaskSlots: 2” > 不太理解,taskmanager.numberOfTaskSlots=2 是怎么被设置进去的(为什么是2?)。 感觉configMap > 里不应该有这项配置才对,或者该配置项的值为1。 > > > > >
Re: Flink Forward Asia 2022 议题征集
收到,感谢! 2022年10月8日 11:41,王翔 mailto:wangxiang921...@163.com>> 写道: 今年的 Flink Forward Asia 2022 已正式启动!暂定于 11 月在线上举办,探讨交流 Flink 最新动态。 延续 FFA 惯例,会议所有议题均为开放征集而来,并由专业的议题评选委员会评分筛选,确保内容代表行业领先水平。 Flink Forward Asia 2022 将采用议题标签的形式呈现所有大会精彩内容,围绕 Flink 横跨多行业,新场景。每个议题可以选择 1-2 个标签。主要标签划分如下: [cid:4333d1c8$2$183b5ad8b26$Coremail$wangxiang921109$163.com]
Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。
【问题描述】 Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 可以看到容器中如下error日志。 【操作步骤】 部署Cluster apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-cluster-1jm-checkpoint spec: image: flink:1.15 flinkVersion: v1_15 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: file:///flink-data/savepoints state.checkpoints.dir: file:///flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: file:///flink-data/ha state.checkpoints.num-retained: "10" serviceAccount: flink ingress: template: "{{name}}.{{namespace}}.k8s.rf.io" jobManager: replicas: 2 podTemplate: spec: nodeSelector: kubernetes.io/hostname: k8s17 containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume volumes: - name: flink-volume hostPath: # directory location on host path: /tmp/flink # this field is optional type: Directory 部署job: apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: flink-job-1jm-checkpoint spec: deploymentName: flink-cluster-1jm-checkpoint job: jarURI: file:///opt/flink/examples/streaming/StateMachineExample.jar # 自己打的operator镜像包含了examples的jar entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample parallelism: 1 upgradeMode: savepoint 【相关日志】 1. job部署成功可以运行的一次,operator日志: 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: File 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. at org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ] at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source 一个JobManager Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。 1. job部署失败operator日志: 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', namespace='flink'}, version: 120505701} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error.,
flinksql-redis-connector
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |