Thanks for the logs.

The OK job seems to read from "s3a://test-bucket/", while the KO job reads
from "s3a://bucket-test/". Could it be that you are just trying to access
the wrong bucket?

What I also found interesting from the KO Job TaskManager is this log
message:

Caused by: java.net.NoRouteToHostException: No route to host (Host
unreachable)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
~[?:1.8.0_171]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
~[?:1.8.0_171]
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
~[?:1.8.0_171]
at
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
at
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]

Does your env allow access to all AWS resources?

On Tue, Jun 15, 2021 at 7:12 PM Angelo G. <angelo.guas...@gmail.com> wrote:

> Thank you Svend  and Till for your help.
>
> Sorry for the the late response.
>
> I'll try to give more information about the issue:
>
> I've not worked exactly in the situation you described, although I've had
>> to configure S3 access from a Flink application recently and here are a
>> couple of things I learnt along the way:
>> * You should normally not need to include flink-s3-fs-hadoop nor
>> hadoop-mapreduce-client-core in your classpath but should rather make
>> flink-s3-fs-hadoop available to Flink by putting it into the plugins
>> folder. The motivation for that is that this jar is a fat jar containing a
>> lot of hadoop and aws classes, s.t. including it in your classpath quickly
>> leads to conflicts. The plugins folder is associated with a separate
>> classpath, with helps avoiding those conflicts.
>>
> *Following your advice I've leave these dependencies out from the pom.
> Thank you for the explanation.*
>
>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>> => the documentation regarding how to configure it, and especially security
>> accesses, is available in [1]
>>
> *In our case, connection to S3 should be made via access/secret key pair. *
>
>> * Ideally, when running on AWS, your code should not be using
>> BasicAWSCredentialsProvider, but instead the application should assume a
>> role, which you associate with some IAM permission.  If that's your case,
>> the specific documentation for that situation is in [2]. If you're running
>> some test locally on your laptop, BasicAWSCredentialsProvider with some
>> key id and secret pointing to a dev account may be appropriate.
>>
> *Yes, in the Flink documentation is noted that IAM is the recommended way
> to access S3. But I am forced to use secret/access keys.  I'm not
> indicating in the flink-conf.yaml what credentials provider to use, the
> BasicAWSCredentialsProvider seems to be the default provider for Flink. But
> as we will see, this message is shown only when trying to read Parquet
> format. Other formats poses no problem.*
>
>> * As I understand it, any configuration entry in flink.yaml that starts
>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
>> [3]) => by reading documentation in [1] and [2] you might be able to figure
>> out which parameters are relevant to your case, which you can then set with
>> the mechanism just mentioned. For example, in my case, I simply add this to
>> flink.yaml:
>
> *My flink-yaml.conf is as follows:*
>
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> jobmanager.execution.failover-strategy: region
> fs.s3a.path-style: true
> fs.s3a.region: eu-west-3
> fs.s3a.bucket.testbucket.access.key: xxxx
> fs.s3a.bucket.testbucket.secret.key: xxxx
>
>
>> what Svend has written is very good advice. Additionally, you could give
>> us a bit more context by posting the exact stack trace and the exact
>> configuration you use to deploy the Flink cluster. To me this looks like a
>> configuration/setup problem in combination with AWS.
>
>
> The cluster setup for the tests is as follows:
>
> flink-1.12.2-bin-scala_2.12.tgz 
> <https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz>
>  unizipped in home folder.
>
> flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to 
> flink-1.12.2/plugins/flink-s3-fs-hadoop/
>
> flink-yaml.conf with the above contents.
>
>
> The job is being launched like this:
>
> ~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH    -c 
> org.apache.flink.s3.CompactDemo 
> /home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar
>
>
> Please find attached the two type of traces, one when using 'raw' format for 
> the table (which is working ok) and the other when 'parquet' format is used 
> (which fails).
>
>
>
> Again, sorry for the delay of my response and thank you very much for your 
> help.
>
>
>
>
> On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Angelo,
>>
>> what Svend has written is very good advice. Additionally, you could give
>> us a bit more context by posting the exact stack trace and the exact
>> configuration you use to deploy the Flink cluster. To me this looks like a
>> configuration/setup problem in combination with AWS.
>>
>> Cheers,
>> Till
>>
>> On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote:
>>
>>> Hi Angelo,
>>>
>>> I've not worked exactly in the situation you described, although I've
>>> had to configure S3 access from a Flink application recently and here are a
>>> couple of things I learnt along the way:
>>>
>>> * You should normally not need to include flink-s3-fs-hadoop nor
>>> hadoop-mapreduce-client-core in your classpath but should rather make
>>> flink-s3-fs-hadoop available to Flink by putting it into the plugins
>>> folder. The motivation for that is that this jar is a fat jar containing a
>>> lot of hadoop and aws classes, s.t. including it in your classpath quickly
>>> leads to conflicts. The plugins folder is associated with a separate
>>> classpath, with helps avoiding those conflicts.
>>>
>>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>>> => the documentation regarding how to configure it, and especially security
>>> accesses, is available in [1]
>>>
>>> * Ideally, when running on AWS, your code should not be using
>>> BasicAWSCredentialsProvider, but instead the application should assume
>>> a role, which you associate with some IAM permission.  If that's your case,
>>> the specific documentation for that situation is in [2]. If you're running
>>> some test locally on your laptop, BasicAWSCredentialsProvider with some
>>> key id and secret pointing to a dev account may be appropriate.
>>>
>>> * As I understand it, any configuration entry in flink.yaml that starts
>>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
>>> [3]) => by reading documentation in [1] and [2] you might be able to figure
>>> out which parameters are relevant to your case, which you can then set with
>>> the mechanism just mentioned. For example, in my case, I simply add this to
>>> flink.yaml:
>>>
>>> fs.s3a.aws.credentials.provider:
>>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>>>
>>> * You can debug the various operations that are attempted on S3 by
>>> setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>>>
>>>
>>> Good luck :)
>>>
>>> Svend
>>>
>>>
>>>
>>> [1]
>>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
>>> [2]
>>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
>>> [3]
>>> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>>>
>>>
>>> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>>>
>>> Hello,
>>>
>>> Trying to read a parquet file located in S3 leads to a AWS credentials
>>> exception. Switching to other format (raw, for example) works ok regarding
>>> to file access.
>>>
>>> This is a snippet of code to reproduce the issue:
>>>
>>> static void parquetS3Error() {
>>>
>>>     EnvironmentSettings settings = 
>>> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>>>
>>>     TableEnvironment t_env = TableEnvironment.*create*(settings);
>>>
>>>     // parquet format gives error:
>>>     // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
>>> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
>>>     // No AWS Credentials provided by BasicAWSCredentialsProvider 
>>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>>>     // com.amazonaws.SdkClientException: Failed to connect to service 
>>> endpoint:
>>>     t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
>>> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
>>> 'parquet')");
>>>
>>>     // other formats (i.e. raw) work properly:
>>>     // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
>>>     //                +--------------------------------+
>>>     //                |                            url |
>>>     //                +--------------------------------+
>>>     //                | [80, 65, 82, 49, 21, 0, 21,... |
>>>     //                | [0, 0, 0, 50, 48, 50, 49, 4... |
>>>     t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 
>>> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>>>
>>>     Table t1 = t_env.from("backup");
>>>
>>>     t1.execute().print();
>>>
>>> }
>>>
>>> Flink version is 1.12.2.
>>>
>>> Please find attached the pom with dependencies and version numbers.
>>>
>>> What would be a suitable workaround for this?
>>>
>>> Thank you very much.
>>>
>>> Angelo.
>>>
>>>
>>>
>>>
>>> *Attachments:*
>>>
>>>    - pom.xml
>>>
>>>
>>>

Reply via email to