Hi Jonas,
have you included the s3 credentials in the Flink config file like it's
described in [1]? I'm not sure about this hive.s3.use-instance-credentials
being a valid configuration parameter.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote:

> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>                  [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>

Reply via email to