您好:
附件是使用oss作高可用时的报错,以下是启动flink时的脚本:
../bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-session-1 \
-Dkubernetes.container.image=test/flink:1.13.2-scala_2.12-oss \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=flink-session \
-Dkubernetes.service-account=flink-session-sa \
-Dkubernetes.rest-service.exposed.type=ClusterIP \
-Dtaskmanager.numberOfTaskSlots=6 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Dkubernetes.jobmanager.cpu=1 \
-Dkubernetes.taskmanager.cpu=2 \
-Dfs.oss.endpoint="http://oss-xxxx.local" \
-Dfs.oss.accessKeyId="j0BAJxxxx" \
-Dfs.oss.accessKeySecret="7mzTPiC4wxxxx" \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
-Dhigh-availability.storageDir=oss://bucket-logcenter/flink-state/flink-session-recovery
\
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar
\
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar
-----邮件原件-----
发件人: Yun Tang <[email protected]>
发送时间: 2021年8月30日 11:36
收件人: [email protected]
主题: Re: flink oss ha
Hi,
你好,图片无法加载,可以直接粘贴文字出来
祝好
唐云
________________________________
From: dker eandei <[email protected]>
Sent: Friday, August 27, 2021 14:58
To: [email protected] <[email protected]>
Subject: flink oss ha
您好:
看文档OSS可以用作 FsStatebackend,那么Flink on k8s
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:
[cid:[email protected]]
从 Windows
版邮件<https://apac01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgo.microsoft.com%2Ffwlink%2F%3FLinkId%3D550986&data=04%7C01%7C%7Cd552b12a5a1f4a92aaee08d96b674cd0%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637658913686219405%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9Z9mxYxfkKqWfTCHYPThD3I97KFAFRMKINYExBuge80%3D&reserved=0>发送
2021-08-30 12:21:19,298 INFO akka.remote.Remoting
[] - Remoting started; listening on addresses
:[akka.tcp://flink@ip:6123]
2021-08-30 12:21:19,547 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Actor system
started at akka.tcp://flink@ip:6123
2021-08-30 12:21:21,816 INFO org.apache.flink.runtime.blob.FileSystemBlobStore
[] - Creating highly available BLOB storage directory at
oss://bucket-logcenter/flink-state/flink-session-recovery/flink-session-1/blob
2021-08-30 12:21:22,136 INFO
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] -
[Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 612C5CC21078CF8B58AB7521
[HostId]: null
2021-08-30 12:21:22,145 INFO
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] -
[Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 612C5CC27BF4BC5C747B6452
[HostId]: null
2021-08-30 12:21:22,467 WARN
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] -
[Server]Unable to execute HTTP request: There are invalid characters in
parameters.
[ErrorCode]: InvalidArgument
[RequestId]: 612C5CC2EF7A8F7D9E7B4301
[HostId]: oss-xxxx.local
[ResponseError]:
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InvalidArgument</Code>
<Message>There are invalid characters in parameters.</Message>
<RequestId>612C5CC2EF7A8F7D9E7B4301</RequestId>
<HostId>oss-xxxx.local</HostId>
<ArgumentName>prefix</ArgumentName>
<ArgumentValue>flink-state/flink-session-recovery/flink-session-1/blob/</ArgumentValue>
</Error>
2021-08-30 12:21:22,471 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
KubernetesSessionClusterEntrypoint down with application status FAILED.
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha
services from the instantiated HighAvailabilityServicesFactory
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint.main(KubernetesSessionClusterEntrypoint.java:61)
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSException:
There are invalid characters in parameters.
[ErrorCode]: InvalidArgument
[RequestId]: 612C5CC2EF7A8F7D9E7B4301
[HostId]: oss-xxxx.local
[ResponseError]:
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InvalidArgument</Code>
<Message>There are invalid characters in parameters.</Message>
<RequestId>612C5CC2EF7A8F7D9E7B4301</RequestId>
<HostId>oss-xxxx.local</HostId>
<ArgumentName>prefix</ArgumentName>
<ArgumentValue>flink-state/flink-session-recovery/flink-session-1/blob/</ArgumentValue>
</Error>
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createOSSException(ExceptionFactory.java:100)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSErrorResponseHandler.handle(OSSErrorResponseHandler.java:70)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.handleResponse(ServiceClient.java:257)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:140)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSBucketOperation.listObjects(OSSBucketOperation.java:411)
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.listObjects(OSSClient.java:443)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.listObjects(AliyunOSSFileSystemStore.java:506)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:264)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.mkdirs(AliyunOSSFileSystem.java:524)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
at
org.apache.flink.fs.osshadoop.common.HadoopFileSystem.mkdirs(HadoopFileSystem.java:183)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.mkdirs(PluginFileSystemFactory.java:162)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:64)
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:98)
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76)
at
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:40)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
... 9 more
.
2021-08-30 12:21:22,506 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Stopping Akka RPC service.
2021-08-30 12:21:22,541 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down
remote daemon.
2021-08-30 12:21:22,598 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon
shut down; proceeding with flushing remote transports.
2021-08-30 12:21:22,708 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut
down.
2021-08-30 12:21:22,821 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Stopped Akka RPC service.
2021-08-30 12:21:22,822 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not
start cluster entrypoint KubernetesSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint KubernetesSessionClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint.main(KubernetesSessionClusterEntrypoint.java:61)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha
services from the instantiated HighAvailabilityServicesFactory
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
... 2 more
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSException:
There are invalid characters in parameters.
[ErrorCode]: InvalidArgument
[RequestId]: 612C5CC2EF7A8F7D9E7B4301
[HostId]: oss-xxxx.local
[ResponseError]:
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InvalidArgument</Code>
<Message>There are invalid characters in parameters.</Message>
<RequestId>612C5CC2EF7A8F7D9E7B4301</RequestId>
<HostId>oss-cn-xxxx.local</HostId>
<ArgumentName>prefix</ArgumentName>
<ArgumentValue>flink-state/flink-session-recovery/flink-session-1/blob/</ArgumentValue>
</Error>
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createOSSException(ExceptionFactory.java:100)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSErrorResponseHandler.handle(OSSErrorResponseHandler.java:70)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.handleResponse(ServiceClient.java:257)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:140)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSBucketOperation.listObjects(OSSBucketOperation.java:411)
~[?:?]
at
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.listObjects(OSSClient.java:443)
~[?:?]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.listObjects(AliyunOSSFileSystemStore.java:506)
~[?:?]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:264)
~[?:?]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.mkdirs(AliyunOSSFileSystem.java:524)
~[?:?]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
~[?:?]
at
org.apache.flink.fs.osshadoop.common.HadoopFileSystem.mkdirs(HadoopFileSystem.java:183)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.mkdirs(PluginFileSystemFactory.java:162)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:64)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:98)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:40)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
... 2 more