Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-27 文章 Weihua Hu
Hi, Young

你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。

也许你可以创建一个 jira issue 来跟进这个问题

Best,
Weihua


On Thu, Oct 27, 2022 at 6:51 PM Young Chen  wrote:

> 【问题描述】
>
> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>
> 可以看到容器中如下error日志。
>
>
>
> 【操作步骤】
>
> 部署Cluster
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:
>
>   name: flink-cluster-1jm-checkpoint
>
> spec:
>
>   image: flink:1.15
>
>   flinkVersion: v1_15
>
>   flinkConfiguration:
>
> taskmanager.numberOfTaskSlots: "1"
>
> state.savepoints.dir:
> file:///flink-data/savepoints
>
> state.checkpoints.dir:
> file:///flink-data/checkpoints
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir:
> file:///flink-data/ha
>
> state.checkpoints.num-retained: "10"
>
>   serviceAccount: flink
>
>   ingress:
>
> template: "{{name}}.{{namespace}}.k8s.rf.io"
>
>   jobManager:
>
> replicas: 2
>
>   podTemplate:
>
> spec:
>
>   nodeSelector:
>
> kubernetes.io/hostname: k8s17
>
>   containers:
>
> - name: flink-main-container
>
>   volumeMounts:
>
> - mountPath: /flink-data
>
>   name: flink-volume
>
>   volumes:
>
> - name: flink-volume
>
>   hostPath:
>
> # directory location on host
>
> path: /tmp/flink
>
> # this field is optional
>
> type: Directory
>
>
>
> 部署job:
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkSessionJob
>
> metadata:
>
>   name: flink-job-1jm-checkpoint
>
> spec:
>
>   deploymentName: flink-cluster-1jm-checkpoint
>
>   job:
>
> jarURI:
> file:///opt/flink/examples/streaming/StateMachineExample.jar
> # 自己打的operator镜像包含了examples的jar
>
> entryClass:
> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>
> parallelism: 1
>
> upgradeMode: savepoint
>
>
>
>
>
> 【相关日志】
>
>   1.  job部署成功可以运行的一次,operator日志:
>
> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ]
>
> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
> Source
>
> 一个JobManager
> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>
>
>
>
>
>   1.  job部署失败operator日志:
>
> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
> namespace='flink'}, version: 120505701} failed.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file
> /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
> does not exist
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)
>
> at
> 

Re: flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 文章 Jinzhong Li
hi casel,
目前web ui上应该不支持触发savepoint。  如果要使用stop-with-savepoint功能的话,
可以通过bin/flink[1]或者rest
api[2]的方式。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop


李晋忠


casel.chen  于2022年10月28日周五 09:41写道:

> flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without
> savepoint的。


flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 文章 casel.chen
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。

Re: flinksql 维表join

2022-10-27 文章 Lincoln Lee
Hi,
   Flink 的 lookup join 目前不支持对维表进行预处理, 并且需要有对维表原始字段的等值连接条件(因为需要通过确定的字段值去查找)
示例中 t4 字段不做计算应该是 work 的, 比如 udf(t1.telephone_no) = t4.de_mobile

Best,
Lincoln Lee


Fei Han  于2022年10月27日周四 12:12写道:

> 大家好!请教几个问题
>  1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join
> 例如:临时表
> WITH employee_tmp AS(
>  select
>  userid as userid,
>  name as name,
>  mobile as de_mobile
>  from ygp_dwd_catalog.flink_dwd.employee
> )
>  select
>  *
>  from ( select
>  *
>  from ygp_dwd_catalog.flink_dwd.xxx ) t1
>  left join employee_tmp FOR SYSTEM_TIME AS OF .
> 以上这样写对不对,测试的时候发现是null。
> 2.维表FOR SYSTEM_TIME AS OF 后面关联条件能否如下方式写:
> FOR SYSTEM_TIME AS OF t1.proc_time as t4 on udf(t1.telephone_no) =
> udf(t4.de_mobile)
> 上面写法报错如下:
>  Temporal table join requires an equality condition on fields of table。
> 但问题的关键是UDF在hive和presto都可以跑起来的。FlinkSQL就报了相同的条件要
>


Re: 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?

2022-10-27 文章 Weihua Hu
Hi, LiTing

这是因为 flink-operator 默认配置中有以下两项默认值。

> taskmanager.numberOfTaskSlots: 2
> parallelism.default: 2

你可以在作业的 yaml 的 flinkConfiguration 中覆盖这两个默认配置


Best,
Weihua


On Thu, Oct 27, 2022 at 9:12 AM Jason_H  wrote:

> hi,Liting Liu
>
> 看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。
> 可以参考一下:
> https://blog.csdn.net/sinat_38079265/article/details/108535909
>
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Liting Liu (litiliu) |
> | 发送日期 | 2022年10月26日 13:19 |
> | 收件人 | user-zh |
> | 主题 | 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题? |
> hi:
> 我尝试使用flink-operator 1.2.0 用如下yaml 创建一个任务(该yaml
> 中并未设置taskmanager.numberOfTaskSlots).  遇到了一个问题。 可以稳定复现。
> 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: basic-example
> spec:
> image: flink:1.15
> flinkVersion: v1_15
> flinkConfiguration:
> serviceAccount: flink
> jobManager:
> resource:
> memory: "2048m"
> cpu: 1
> taskManager:
> resource:
> memory: "2048m"
> cpu: 1
> job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: stateless
> 
> 但是在生成的configMap (flink-config-basic-example)中,
> 发现“taskmanager.numberOfTaskSlots: 2”
> 不太理解,taskmanager.numberOfTaskSlots=2 是怎么被设置进去的(为什么是2?)。 感觉configMap
> 里不应该有这项配置才对,或者该配置项的值为1。
>
>
>
>
>


Re: Flink Forward Asia 2022 议题征集

2022-10-27 文章 Steven 王昱翔
收到,感谢!

2022年10月8日 11:41,王翔 mailto:wangxiang921...@163.com>> 
写道:

今年的 Flink Forward Asia 2022 已正式启动!暂定于 11 月在线上举办,探讨交流 Flink 最新动态。

延续 FFA 惯例,会议所有议题均为开放征集而来,并由专业的议题评选委员会评分筛选,确保内容代表行业领先水平。

Flink Forward Asia 2022 将采用议题标签的形式呈现所有大会精彩内容,围绕 Flink 横跨多行业,新场景。每个议题可以选择 1-2 
个标签。主要标签划分如下:

[cid:4333d1c8$2$183b5ad8b26$Coremail$wangxiang921109$163.com]



Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-27 文章 Young Chen
【问题描述】

Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), 
然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。

可以看到容器中如下error日志。



【操作步骤】

部署Cluster



apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  name: flink-cluster-1jm-checkpoint

spec:

  image: flink:1.15

  flinkVersion: v1_15

  flinkConfiguration:

taskmanager.numberOfTaskSlots: "1"

state.savepoints.dir: 
file:///flink-data/savepoints

state.checkpoints.dir: 
file:///flink-data/checkpoints

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: file:///flink-data/ha

state.checkpoints.num-retained: "10"

  serviceAccount: flink

  ingress:

template: "{{name}}.{{namespace}}.k8s.rf.io"

  jobManager:

replicas: 2

  podTemplate:

spec:

  nodeSelector:

kubernetes.io/hostname: k8s17

  containers:

- name: flink-main-container

  volumeMounts:

- mountPath: /flink-data

  name: flink-volume

  volumes:

- name: flink-volume

  hostPath:

# directory location on host

path: /tmp/flink

# this field is optional

type: Directory



部署job:



apiVersion: flink.apache.org/v1beta1

kind: FlinkSessionJob

metadata:

  name: flink-job-1jm-checkpoint

spec:

  deploymentName: flink-cluster-1jm-checkpoint

  job:

jarURI: 
file:///opt/flink/examples/streaming/StateMachineExample.jar
  # 自己打的operator镜像包含了examples的jar

entryClass: 
org.apache.flink.streaming.examples.statemachine.StateMachineExample

parallelism: 1

upgradeMode: savepoint





【相关日志】

  1.  job部署成功可以运行的一次,operator日志:

2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService 
[ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.

java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: File 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist in 
/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.

at 
org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)

at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.base/java.lang.Thread.run(Unknown Source)

]

at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source

一个JobManager 
Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。





  1.  job部署失败operator日志:

2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/flink-job-1jm-checkpoint] Error during event processing 
ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', 
namespace='flink'}, version: 120505701} failed.

org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
org.apache.flink.util.FlinkRuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., 

flinksql-redis-connector

2022-10-27 文章 Jason_H
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。


| |
Jason_H
|
|
hyb_he...@163.com
|