Hello, Robert. I've been changing manually the name of the buckets in the logs and other potentially sensitive data. The name of the buckets are ok, since changing the format from 'parquet' to 'raw' allows to retrieve the data. Sorry for the confusion.
Does your env allow access to all AWS resources? Yes, I have full access to the aws objects. Interesting fact: I have checked that putting the access/secret keys as OS environment variables and instructing Flink to use EnvironmentVariableCredentialsProvider in the flink-conf.yaml works OK for both Parquet and Raw. Problem is that I won't be allowed to use environment vars in production environment. Thank you very much. On Wed, Jun 16, 2021 at 1:37 PM Robert Metzger <rmetz...@apache.org> wrote: > Thanks for the logs. > > The OK job seems to read from "s3a://test-bucket/", while the KO job reads > from "s3a://bucket-test/". Could it be that you are just trying to access > the wrong bucket? > > What I also found interesting from the KO Job TaskManager is this log > message: > > Caused by: java.net.NoRouteToHostException: No route to host (Host > unreachable) > at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > ~[?:1.8.0_171] > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > ~[?:1.8.0_171] > at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171] > at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) > ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) > ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) > ~[?:1.8.0_171] > at > com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) > ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] > at > com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) > ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] > > Does your env allow access to all AWS resources? > > On Tue, Jun 15, 2021 at 7:12 PM Angelo G. <angelo.guas...@gmail.com> > wrote: > >> Thank you Svend and Till for your help. >> >> Sorry for the the late response. >> >> I'll try to give more information about the issue: >> >> I've not worked exactly in the situation you described, although I've had >>> to configure S3 access from a Flink application recently and here are a >>> couple of things I learnt along the way: >>> * You should normally not need to include flink-s3-fs-hadoop nor >>> hadoop-mapreduce-client-core in your classpath but should rather make >>> flink-s3-fs-hadoop available to Flink by putting it into the plugins >>> folder. The motivation for that is that this jar is a fat jar containing a >>> lot of hadoop and aws classes, s.t. including it in your classpath quickly >>> leads to conflicts. The plugins folder is associated with a separate >>> classpath, with helps avoiding those conflicts. >>> >> *Following your advice I've leave these dependencies out from the pom. >> Thank you for the explanation.* >> >>> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >>> => the documentation regarding how to configure it, and especially security >>> accesses, is available in [1] >>> >> *In our case, connection to S3 should be made via access/secret key >> pair. * >> >>> * Ideally, when running on AWS, your code should not be using >>> BasicAWSCredentialsProvider, but instead the application should assume >>> a role, which you associate with some IAM permission. If that's your case, >>> the specific documentation for that situation is in [2]. If you're running >>> some test locally on your laptop, BasicAWSCredentialsProvider with some >>> key id and secret pointing to a dev account may be appropriate. >>> >> *Yes, in the Flink documentation is noted that IAM is the recommended way >> to access S3. But I am forced to use secret/access keys. I'm not >> indicating in the flink-conf.yaml what credentials provider to use, the >> BasicAWSCredentialsProvider seems to be the default provider for Flink. But >> as we will see, this message is shown only when trying to read Parquet >> format. Other formats poses no problem.* >> >>> * As I understand it, any configuration entry in flink.yaml that starts >>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in >>> [3]) => by reading documentation in [1] and [2] you might be able to figure >>> out which parameters are relevant to your case, which you can then set with >>> the mechanism just mentioned. For example, in my case, I simply add this to >>> flink.yaml: >> >> *My flink-yaml.conf is as follows:* >> >> taskmanager.memory.process.size: 1728m >> taskmanager.numberOfTaskSlots: 1 >> parallelism.default: 1 >> jobmanager.execution.failover-strategy: region >> fs.s3a.path-style: true >> fs.s3a.region: eu-west-3 >> fs.s3a.bucket.testbucket.access.key: xxxx >> fs.s3a.bucket.testbucket.secret.key: xxxx >> >> >>> what Svend has written is very good advice. Additionally, you could give >>> us a bit more context by posting the exact stack trace and the exact >>> configuration you use to deploy the Flink cluster. To me this looks like a >>> configuration/setup problem in combination with AWS. >> >> >> The cluster setup for the tests is as follows: >> >> flink-1.12.2-bin-scala_2.12.tgz >> <https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz> >> unizipped in home folder. >> >> flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to >> flink-1.12.2/plugins/flink-s3-fs-hadoop/ >> >> flink-yaml.conf with the above contents. >> >> >> The job is being launched like this: >> >> ~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c >> org.apache.flink.s3.CompactDemo >> /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar >> >> >> Please find attached the two type of traces, one when using 'raw' format for >> the table (which is working ok) and the other when 'parquet' format is used >> (which fails). >> >> >> >> Again, sorry for the delay of my response and thank you very much for your >> help. >> >> >> >> >> On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Angelo, >>> >>> what Svend has written is very good advice. Additionally, you could give >>> us a bit more context by posting the exact stack trace and the exact >>> configuration you use to deploy the Flink cluster. To me this looks like a >>> configuration/setup problem in combination with AWS. >>> >>> Cheers, >>> Till >>> >>> On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote: >>> >>>> Hi Angelo, >>>> >>>> I've not worked exactly in the situation you described, although I've >>>> had to configure S3 access from a Flink application recently and here are a >>>> couple of things I learnt along the way: >>>> >>>> * You should normally not need to include flink-s3-fs-hadoop nor >>>> hadoop-mapreduce-client-core in your classpath but should rather make >>>> flink-s3-fs-hadoop available to Flink by putting it into the plugins >>>> folder. The motivation for that is that this jar is a fat jar containing a >>>> lot of hadoop and aws classes, s.t. including it in your classpath quickly >>>> leads to conflicts. The plugins folder is associated with a separate >>>> classpath, with helps avoiding those conflicts. >>>> >>>> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >>>> => the documentation regarding how to configure it, and especially security >>>> accesses, is available in [1] >>>> >>>> * Ideally, when running on AWS, your code should not be using >>>> BasicAWSCredentialsProvider, but instead the application should assume >>>> a role, which you associate with some IAM permission. If that's your case, >>>> the specific documentation for that situation is in [2]. If you're running >>>> some test locally on your laptop, BasicAWSCredentialsProvider with >>>> some key id and secret pointing to a dev account may be appropriate. >>>> >>>> * As I understand it, any configuration entry in flink.yaml that starts >>>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in >>>> [3]) => by reading documentation in [1] and [2] you might be able to figure >>>> out which parameters are relevant to your case, which you can then set with >>>> the mechanism just mentioned. For example, in my case, I simply add this to >>>> flink.yaml: >>>> >>>> fs.s3a.aws.credentials.provider: >>>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" >>>> >>>> * You can debug the various operations that are attempted on S3 by >>>> setting this logger to DEBUG level: org.apache.hadoop.fs.s3a >>>> >>>> >>>> Good luck :) >>>> >>>> Svend >>>> >>>> >>>> >>>> [1] >>>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html >>>> [2] >>>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html >>>> [3] >>>> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink- >>>> >>>> >>>> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote: >>>> >>>> Hello, >>>> >>>> Trying to read a parquet file located in S3 leads to a AWS credentials >>>> exception. Switching to other format (raw, for example) works ok regarding >>>> to file access. >>>> >>>> This is a snippet of code to reproduce the issue: >>>> >>>> static void parquetS3Error() { >>>> >>>> EnvironmentSettings settings = >>>> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build(); >>>> >>>> TableEnvironment t_env = TableEnvironment.*create*(settings); >>>> >>>> // parquet format gives error: >>>> // Caused by: java.net.SocketTimeoutException: doesBucketExist on >>>> bucket-prueba-medusa: com.amazonaws.AmazonClientException: >>>> // No AWS Credentials provided by BasicAWSCredentialsProvider >>>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : >>>> // com.amazonaws.SdkClientException: Failed to connect to service >>>> endpoint: >>>> t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) >>>> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = >>>> 'parquet')"); >>>> >>>> // other formats (i.e. raw) work properly: >>>> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 >>>> // +--------------------------------+ >>>> // | url | >>>> // +--------------------------------+ >>>> // | [80, 65, 82, 49, 21, 0, 21,... | >>>> // | [0, 0, 0, 50, 48, 50, 49, 4... | >>>> t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( >>>> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); >>>> >>>> Table t1 = t_env.from("backup"); >>>> >>>> t1.execute().print(); >>>> >>>> } >>>> >>>> Flink version is 1.12.2. >>>> >>>> Please find attached the pom with dependencies and version numbers. >>>> >>>> What would be a suitable workaround for this? >>>> >>>> Thank you very much. >>>> >>>> Angelo. >>>> >>>> >>>> >>>> >>>> *Attachments:* >>>> >>>> - pom.xml >>>> >>>> >>>>