Hi Jonas, have you included the s3 credentials in the Flink config file like it's described in [1]? I'm not sure about this hive.s3.use-instance-credentials being a valid configuration parameter.
Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote: > Hey, > > I am setting up HA on a standalone Kubernetes Flink application job > cluster. > Flink (1.12.5) is used and I am using S3 as the storage backend > > * The JobManager shortly fails after starts with the following errors > (apologies in advance for the length), and I can't understand what's going > on. > * First I thought it may be due to missing Delete privileges of the IAM > role and updated that, but the problem persists. > * The S3 bucket configured s3://<company>/recovery is empty. > > configmap.yaml > flink-conf.yaml: |+ > jobmanager.rpc.address: {{ $fullName }}-jobmanager > jobmanager.rpc.port: 6123 > jobmanager.memory.process.size: 1600m > taskmanager.numberOfTaskSlots: 2 > taskmanager.rpc.port: 6122 > taskmanager.memory.process.size: 1728m > blob.server.port: 6124 > queryable-state.proxy.ports: 6125 > parallelism.default: 2 > scheduler-mode: reactive > execution.checkpointing.interval: 10s > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 10 > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > kubernetes.cluster-id: {{ $fullName }} > high-availability.storageDir: s3://<company>-flink-{{ .Values.environment > }}/recovery > hive.s3.use-instance-credentials: true > kubernetes.namespace: {{ $fullName }} # The namespace that will be used > for running the jobmanager and taskmanager pods > > role.yaml > kind: Role > apiVersion: rbac.authorization.k8s.io/v1 > metadata: > name: {{ $fullName }} > namespace: {{ $fullName }} > labels: > app: {{ $appName }} > chart: {{ template "thoros.chart" . }} > release: {{ .Release.Name }} > heritage: {{ .Release.Service }} > > rules: > - apiGroups: [""] > resources: ["configmaps"] > verbs: ["create", "edit", "delete", "watch", "get", "list", "update"] > > aws IAM policy > { > "Version": "2012-10-17", > "Statement": [ > { > "Action": [ > "s3:ListBucket", > "s3:Get*", > "s3:Put*", > "s3:Delete*" > ], > "Resource": [ > "arn:aws:s3:::<company>-flink-dev/*" > ], > "Effect": "Allow" > } > ] > } > > *Error-log:* > 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner > [] - Executing pipeline using FlinkRunner. > 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner > [] - For maximum performance you should set the > 'fasterCopy' option. See more at > https://issues.apache.org/jira/browse/BEAM-11146 > 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner > [] - Translating pipeline to Flink program. > 2021-08-26 13:08:43,456 INFO > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found > unbounded PCollection. Switching to streaming execution. > 2021-08-26 13:08:43,461 INFO > org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating > a Streaming Environment. > 2021-08-26 13:08:43,462 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.address, thoros-jobmanager > 2021-08-26 13:08:43,462 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.port, 6123 > 2021-08-26 13:08:43,462 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.memory.process.size, 1600m > 2021-08-26 13:08:43,463 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.numberOfTaskSlots, 2 > 2021-08-26 13:08:43,463 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.rpc.port, 6122 > 2021-08-26 13:08:43,463 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.memory.process.size, 1728m > 2021-08-26 13:08:43,463 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: blob.server.port, 6124 > 2021-08-26 13:08:43,464 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: queryable-state.proxy.ports, 6125 > 2021-08-26 13:08:43,464 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: parallelism.default, 2 > 2021-08-26 13:08:43,465 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: scheduler-mode, reactive > 2021-08-26 13:08:43,465 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: execution.checkpointing.interval, 10s > 2021-08-26 13:08:43,466 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: restart-strategy, fixed-delay > 2021-08-26 13:08:43,466 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: restart-strategy.fixed-delay.attempts, 10 > 2021-08-26 13:08:43,466 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: high-availability, > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > 2021-08-26 13:08:43,467 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: kubernetes.cluster-id, thoros > 2021-08-26 13:08:43,467 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: high-availability.storageDir, > s3://<company>-flink-dev/recovery > 2021-08-26 13:08:43,468 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: hive.s3.use-instance-credentials, true > 2021-08-26 13:08:43,468 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: kubernetes.namespace, thoros > 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner > [] - Starting execution of Flink program. > 2021-08-26 13:08:45,444 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Job 00000000000000000000000000000000 is submitted. > 2021-08-26 13:08:45,454 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Submitting Job with JobId=00000000000000000000000000000000. > 2021-08-26 13:08:45,486 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 00000000000000000000000000000000 > (main0-flink-0826130845-6f3e805f). > 2021-08-26 13:08:45,498 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - > Submitting job 00000000000000000000000000000000 > (main0-flink-0826130845-6f3e805f). > 2021-08-26 13:08:46,152 INFO > org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed > job graph 00000000000000000000000000000000 from > KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}. > 2021-08-26 13:08:46,169 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Clean up the high availability data for job > 00000000000000000000000000000000. > 2021-08-26 13:08:46,213 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Finished cleaning up the high availability data for job > 00000000000000000000000000000000. > 2021-08-26 13:08:46,231 WARN > org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed > to delete blob at > s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000 > 2021-08-26 13:08:46,239 ERROR > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to > submit job 00000000000000000000000000000000. > java.lang.RuntimeException: java.lang.Exception: Could not open output > stream for state backend > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > ~[?:1.8.0_302] > at > java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) > ~[?:1.8.0_302] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > ~[?:1.8.0_302] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > Caused by: java.lang.Exception: Could not open output stream for state > backend > at > org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > ... 27 more > Caused by: > com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: > com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: > Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: > V0BWCA4RDVE0EVK8; S3 Extended Request ID: > yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=; > Proxy: null), S3 Extended Request ID: > yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o= > (Path: > s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6) > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) > ~[?:?] > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356) > ~[?:?] > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?] > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?] > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?] > at > org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154) > ~[?:?] > at > org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) > ~[flink-dist_2.12-1.12.5.jar:1.12.5] > ... 27 more > -- > *Med Vänliga Hälsningar* > *Jonas Eyob* >