Thanks for the logs. The OK job seems to read from "s3a://test-bucket/", while the KO job reads from "s3a://bucket-test/". Could it be that you are just trying to access the wrong bucket?
What I also found interesting from the KO Job TaskManager is this log message: Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_171] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_171] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_171] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171] at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171] at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171] at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[?:1.8.0_171] at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[?:1.8.0_171] at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[?:1.8.0_171] at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171] at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171] at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[?:1.8.0_171] at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) ~[?:1.8.0_171] at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[?:1.8.0_171] at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[?:1.8.0_171] at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] Does your env allow access to all AWS resources? On Tue, Jun 15, 2021 at 7:12 PM Angelo G. <angelo.guas...@gmail.com> wrote: > Thank you Svend and Till for your help. > > Sorry for the the late response. > > I'll try to give more information about the issue: > > I've not worked exactly in the situation you described, although I've had >> to configure S3 access from a Flink application recently and here are a >> couple of things I learnt along the way: >> * You should normally not need to include flink-s3-fs-hadoop nor >> hadoop-mapreduce-client-core in your classpath but should rather make >> flink-s3-fs-hadoop available to Flink by putting it into the plugins >> folder. The motivation for that is that this jar is a fat jar containing a >> lot of hadoop and aws classes, s.t. including it in your classpath quickly >> leads to conflicts. The plugins folder is associated with a separate >> classpath, with helps avoiding those conflicts. >> > *Following your advice I've leave these dependencies out from the pom. > Thank you for the explanation.* > >> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >> => the documentation regarding how to configure it, and especially security >> accesses, is available in [1] >> > *In our case, connection to S3 should be made via access/secret key pair. * > >> * Ideally, when running on AWS, your code should not be using >> BasicAWSCredentialsProvider, but instead the application should assume a >> role, which you associate with some IAM permission. If that's your case, >> the specific documentation for that situation is in [2]. If you're running >> some test locally on your laptop, BasicAWSCredentialsProvider with some >> key id and secret pointing to a dev account may be appropriate. >> > *Yes, in the Flink documentation is noted that IAM is the recommended way > to access S3. But I am forced to use secret/access keys. I'm not > indicating in the flink-conf.yaml what credentials provider to use, the > BasicAWSCredentialsProvider seems to be the default provider for Flink. But > as we will see, this message is shown only when trying to read Parquet > format. Other formats poses no problem.* > >> * As I understand it, any configuration entry in flink.yaml that starts >> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in >> [3]) => by reading documentation in [1] and [2] you might be able to figure >> out which parameters are relevant to your case, which you can then set with >> the mechanism just mentioned. For example, in my case, I simply add this to >> flink.yaml: > > *My flink-yaml.conf is as follows:* > > taskmanager.memory.process.size: 1728m > taskmanager.numberOfTaskSlots: 1 > parallelism.default: 1 > jobmanager.execution.failover-strategy: region > fs.s3a.path-style: true > fs.s3a.region: eu-west-3 > fs.s3a.bucket.testbucket.access.key: xxxx > fs.s3a.bucket.testbucket.secret.key: xxxx > > >> what Svend has written is very good advice. Additionally, you could give >> us a bit more context by posting the exact stack trace and the exact >> configuration you use to deploy the Flink cluster. To me this looks like a >> configuration/setup problem in combination with AWS. > > > The cluster setup for the tests is as follows: > > flink-1.12.2-bin-scala_2.12.tgz > <https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz> > unizipped in home folder. > > flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to > flink-1.12.2/plugins/flink-s3-fs-hadoop/ > > flink-yaml.conf with the above contents. > > > The job is being launched like this: > > ~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c > org.apache.flink.s3.CompactDemo > /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar > > > Please find attached the two type of traces, one when using 'raw' format for > the table (which is working ok) and the other when 'parquet' format is used > (which fails). > > > > Again, sorry for the delay of my response and thank you very much for your > help. > > > > > On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Angelo, >> >> what Svend has written is very good advice. Additionally, you could give >> us a bit more context by posting the exact stack trace and the exact >> configuration you use to deploy the Flink cluster. To me this looks like a >> configuration/setup problem in combination with AWS. >> >> Cheers, >> Till >> >> On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote: >> >>> Hi Angelo, >>> >>> I've not worked exactly in the situation you described, although I've >>> had to configure S3 access from a Flink application recently and here are a >>> couple of things I learnt along the way: >>> >>> * You should normally not need to include flink-s3-fs-hadoop nor >>> hadoop-mapreduce-client-core in your classpath but should rather make >>> flink-s3-fs-hadoop available to Flink by putting it into the plugins >>> folder. The motivation for that is that this jar is a fat jar containing a >>> lot of hadoop and aws classes, s.t. including it in your classpath quickly >>> leads to conflicts. The plugins folder is associated with a separate >>> classpath, with helps avoiding those conflicts. >>> >>> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >>> => the documentation regarding how to configure it, and especially security >>> accesses, is available in [1] >>> >>> * Ideally, when running on AWS, your code should not be using >>> BasicAWSCredentialsProvider, but instead the application should assume >>> a role, which you associate with some IAM permission. If that's your case, >>> the specific documentation for that situation is in [2]. If you're running >>> some test locally on your laptop, BasicAWSCredentialsProvider with some >>> key id and secret pointing to a dev account may be appropriate. >>> >>> * As I understand it, any configuration entry in flink.yaml that starts >>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in >>> [3]) => by reading documentation in [1] and [2] you might be able to figure >>> out which parameters are relevant to your case, which you can then set with >>> the mechanism just mentioned. For example, in my case, I simply add this to >>> flink.yaml: >>> >>> fs.s3a.aws.credentials.provider: >>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" >>> >>> * You can debug the various operations that are attempted on S3 by >>> setting this logger to DEBUG level: org.apache.hadoop.fs.s3a >>> >>> >>> Good luck :) >>> >>> Svend >>> >>> >>> >>> [1] >>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html >>> [2] >>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html >>> [3] >>> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink- >>> >>> >>> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote: >>> >>> Hello, >>> >>> Trying to read a parquet file located in S3 leads to a AWS credentials >>> exception. Switching to other format (raw, for example) works ok regarding >>> to file access. >>> >>> This is a snippet of code to reproduce the issue: >>> >>> static void parquetS3Error() { >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build(); >>> >>> TableEnvironment t_env = TableEnvironment.*create*(settings); >>> >>> // parquet format gives error: >>> // Caused by: java.net.SocketTimeoutException: doesBucketExist on >>> bucket-prueba-medusa: com.amazonaws.AmazonClientException: >>> // No AWS Credentials provided by BasicAWSCredentialsProvider >>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : >>> // com.amazonaws.SdkClientException: Failed to connect to service >>> endpoint: >>> t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) >>> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = >>> 'parquet')"); >>> >>> // other formats (i.e. raw) work properly: >>> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 >>> // +--------------------------------+ >>> // | url | >>> // +--------------------------------+ >>> // | [80, 65, 82, 49, 21, 0, 21,... | >>> // | [0, 0, 0, 50, 48, 50, 49, 4... | >>> t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( >>> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); >>> >>> Table t1 = t_env.from("backup"); >>> >>> t1.execute().print(); >>> >>> } >>> >>> Flink version is 1.12.2. >>> >>> Please find attached the pom with dependencies and version numbers. >>> >>> What would be a suitable workaround for this? >>> >>> Thank you very much. >>> >>> Angelo. >>> >>> >>> >>> >>> *Attachments:* >>> >>> - pom.xml >>> >>> >>>