Hi. 

I had the same problem. Flink use a plugins to access s3. When you run local it 
starts a mini cluster and the mini cluster don’t load plugins. So it’s not 
possible without modifying Flink.  In my case I wanted to investigate save 
points through Flink processor API and the workaround was to build my own 
version of the processor API and include the missing part. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala 
> <kali.tumm...@gmail.com>:
> 
> 
> Flink, 
> 
> I am able to access Kinesis from Intellij but not S3 I have edited my stack 
> overflow question with kinesis code , Flink is still having issues reading S3.
> 
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
> 
> 
> Thanks
> Sri 
> 
>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala 
>> <kali.tumm...@gmail.com> wrote:
>> my stack overflow question.
>> 
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>> 
>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala 
>>> <kali.tumm...@gmail.com> wrote:
>>> Here is my Intellij question.
>>> 
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>> 
>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala 
>>>> <kali.tumm...@gmail.com> wrote:
>>>> 
>>>>> Hi Flink Experts,
>>>> 
>>>>> I am trying to read an S3 file from my Intellij using Flink I am.comimg 
>>>>> across Aws Auth error can someone help below are all the details.
>>>>    
>>>>> I have Aws credentials in homefolder/.aws/credentials
>>>> 
>>>>> My Intellij Environment Variables:-
>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>> 
>>>>> flink-conf.yaml file content:-
>>>>> fs.hdfs.hadoopconf: 
>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>> core-site.xml file content:-
>>>>> <?xml version="1.0"?>
>>>>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>>>>> 
>>>>> <configuration>
>>>>>     <property>
>>>>>         <name>fs.s3.impl</name>
>>>>>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>>>>     </property>
>>>>> 
>>>>>     <property>
>>>>>         <name>fs.s3.buffer.dir</name>
>>>>>         <value>/tmp</value>
>>>>>     </property>
>>>>> 
>>>>>     <property>
>>>>>         <name>fs.s3a.server-side-encryption-algorithm</name>
>>>>>         <value>AES256</value>
>>>>>     </property>
>>>>> 
>>>>>     <!--<property>
>>>>>         <name>fs.s3a.aws.credentials.provider</name>
>>>>>         
>>>>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
>>>>>     </property>-->
>>>>> 
>>>>>     <property>
>>>>>         <name>fs.s3a.aws.credentials.provider</name>
>>>>>         
>>>>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.access.key</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.secret.key</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.session.token</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>> 
>>>>>     <property>
>>>>>         <name>fs.s3a.proxy.host</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.proxy.port</name>
>>>>>         <value>8099</value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.proxy.username</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>>     <property>
>>>>>         <name>fs.s3a.proxy.password</name>
>>>>>         <value></value>
>>>>>     </property>
>>>>> 
>>>>> </configuration>
>>>>> POM.xml file:-
>>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>>>>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>>>     <modelVersion>4.0.0</modelVersion>
>>>>> 
>>>>>     <groupId>FlinkStreamAndSql</groupId>
>>>>>     <artifactId>FlinkStreamAndSql</artifactId>
>>>>>     <version>1.0-SNAPSHOT</version>
>>>>>     <build>
>>>>>         <sourceDirectory>src/main/scala</sourceDirectory>
>>>>>         <plugins>
>>>>>             <plugin>
>>>>>                 <!-- see http://davidb.github.com/scala-maven-plugin -->
>>>>>                 <groupId>net.alchim31.maven</groupId>
>>>>>                 <artifactId>scala-maven-plugin</artifactId>
>>>>>                 <version>3.1.3</version>
>>>>>                 <executions>
>>>>>                     <execution>
>>>>>                         <goals>
>>>>>                             <goal>compile</goal>
>>>>>                             <goal>testCompile</goal>
>>>>>                         </goals>
>>>>>                         <configuration>
>>>>>                         </configuration>
>>>>>                     </execution>
>>>>>                 </executions>
>>>>>             </plugin>
>>>>>             <plugin>
>>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>>                 <artifactId>maven-surefire-plugin</artifactId>
>>>>>                 <version>2.13</version>
>>>>>                 <configuration>
>>>>>                     <useFile>false</useFile>
>>>>>                     <disableXmlReport>true</disableXmlReport>
>>>>>                     <!-- If you have classpath issue like 
>>>>> NoDefClassError,... -->
>>>>>                     <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
>>>>>                     <includes>
>>>>>                         <include>**/*Test.*</include>
>>>>>                         <include>**/*Suite.*</include>
>>>>>                     </includes>
>>>>>                 </configuration>
>>>>>             </plugin>
>>>>> 
>>>>>             <!-- "package" command plugin -->
>>>>>             <plugin>
>>>>>                 <artifactId>maven-assembly-plugin</artifactId>
>>>>>                 <version>2.4.1</version>
>>>>>                 <configuration>
>>>>>                     <descriptorRefs>
>>>>>                         
>>>>> <descriptorRef>jar-with-dependencies</descriptorRef>
>>>>>                     </descriptorRefs>
>>>>>                 </configuration>
>>>>>                 <executions>
>>>>>                     <execution>
>>>>>                         <id>make-assembly</id>
>>>>>                         <phase>package</phase>
>>>>>                         <goals>
>>>>>                             <goal>single</goal>
>>>>>                         </goals>
>>>>>                     </execution>
>>>>>                 </executions>
>>>>>             </plugin>
>>>>>         </plugins>
>>>>>     </build>
>>>>>     <dependencies>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-core</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-core</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-clients_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.derby</groupId>
>>>>>             <artifactId>derby</artifactId>
>>>>>             <version>10.13.1.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-jdbc_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-api-scala_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-api-java</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-planner_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-json</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-scala_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>        <dependency>
>>>>>            <groupId>org.apache.flink</groupId>
>>>>>            <artifactId>flink-scala_2.11</artifactId>
>>>>>            <version>1.8.1</version>
>>>>>        </dependency>
>>>>> 
>>>>>        <dependency>
>>>>>            <groupId>org.apache.flink</groupId>
>>>>>            <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>>            <version>1.8.1</version>
>>>>>        </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>org.apache.flink</groupId>
>>>>>                    <artifactId>flink-connector-kinesis_2.11</artifactId>
>>>>>                    <version>1.8.0</version>
>>>>>                    <scope>system</scope>
>>>>>                    
>>>>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath>
>>>>>                </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>org.apache.flink</groupId>
>>>>>                    
>>>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>>>>>                    <version>1.8.1</version>
>>>>>                </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>com.amazonaws</groupId>
>>>>>                    <artifactId>amazon-kinesis-client</artifactId>
>>>>>                    <version>1.8.8</version>
>>>>>                </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>com.amazonaws</groupId>
>>>>>                    <artifactId>aws-java-sdk-kinesis</artifactId>
>>>>>                    <version>1.11.579</version>
>>>>>                </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>commons-dbcp</groupId>
>>>>>                    <artifactId>commons-dbcp</artifactId>
>>>>>                    <version>1.2.2</version>
>>>>>                </dependency>
>>>>>                <dependency>
>>>>>                    <groupId>com.google.code.gson</groupId>
>>>>>                    <artifactId>gson</artifactId>
>>>>>                    <version>2.1</version>
>>>>>                </dependency>
>>>>> 
>>>>>                <dependency>
>>>>>                    <groupId>commons-cli</groupId>
>>>>>                    <artifactId>commons-cli</artifactId>
>>>>>                    <version>1.4</version>
>>>>>                </dependency>
>>>>> 
>>>>>                <!-- 
>>>>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.commons</groupId>
>>>>>             <artifactId>commons-csv</artifactId>
>>>>>             <version>1.7</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.commons</groupId>
>>>>>             <artifactId>commons-compress</artifactId>
>>>>>             <version>1.4.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>com.amazonaws</groupId>
>>>>>             <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
>>>>>             <version>1.4.0</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>com.amazonaws</groupId>
>>>>>             <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
>>>>>             <version>1.4.0</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>com.amazonaws</groupId>
>>>>>             <artifactId>aws-java-sdk</artifactId>
>>>>>             <version>1.11.579</version>
>>>>>         </dependency>
>>>>> 
>>>>> 
>>>>>         <!-- For Parquet -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-hadoop-compatibility_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-avro</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.parquet</groupId>
>>>>>             <artifactId>parquet-avro</artifactId>
>>>>>             <version>1.10.0</version>
>>>>>         </dependency>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>             <artifactId>hadoop-mapreduce-client-core</artifactId>
>>>>>             <version>3.1.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-connector-twitter_2.10</artifactId>
>>>>>             <version>1.1.4-hadoop1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-connector-filesystem_2.11</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.json4s</groupId>
>>>>>             <artifactId>json4s-jackson_2.11</artifactId>
>>>>>             <version>3.6.7</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>com.amazonaws</groupId>
>>>>>             <artifactId>aws-java-sdk-cloudsearch</artifactId>
>>>>>             <version>1.11.500</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <!-- 
>>>>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 
>>>>> -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-shaded-hadoop2</artifactId>
>>>>>             <version>2.8.3-1.8.3</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-s3-fs-hadoop</artifactId>
>>>>>             <version>1.8.1</version>
>>>>>         </dependency>
>>>>> 
>>>>>         <dependency>
>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>             <artifactId>hadoop-common</artifactId>
>>>>>             <version>2.8.5</version>
>>>>>         </dependency>
>>>>> 
>>>>> 
>>>>>     </dependencies>
>>>>> 
>>>>> </project>
>>>>> 
>>>>> Scala Code:-
>>>>> package com.aws.examples.s3
>>>>> 
>>>>> 
>>>>> import org.apache.flink.api.common.typeinfo.Types
>>>>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>> import org.apache.flink.table.api.java.BatchTableEnvironment
>>>>> import org.apache.flink.table.sources.CsvTableSource
>>>>> 
>>>>> object Batch {
>>>>> 
>>>>>   def main(args: Array[String]): Unit = {
>>>>>     
>>>>>     val env: ExecutionEnvironment =
>>>>>       ExecutionEnvironment.getExecutionEnvironment
>>>>>     val tableEnv: BatchTableEnvironment =
>>>>>       TableEnvironment.getTableEnvironment(env)
>>>>>     /* create table from csv */
>>>>> 
>>>>>     val tableSrc = CsvTableSource
>>>>>       .builder()
>>>>>       .path("s3a://bucket/csvfolder/avg.txt")
>>>>>       .fieldDelimiter(",")
>>>>>       .field("date", Types.STRING)
>>>>>       .field("month", Types.STRING)
>>>>>       .field("category", Types.STRING)
>>>>>       .field("product", Types.STRING)
>>>>>       .field("profit", Types.INT)
>>>>>       .build()
>>>>> 
>>>>>     tableEnv.registerTableSource("CatalogTable", tableSrc)
>>>>> 
>>>>>     val catalog: Table = tableEnv.scan("CatalogTable")
>>>>>     /* querying with Table API */
>>>>> 
>>>>>     val order20: Table = catalog
>>>>>       .filter(" category === 'Category5'")
>>>>>       .groupBy("month")
>>>>>       .select("month, profit.sum as sum")
>>>>>       .orderBy("sum")
>>>>> 
>>>>>     val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, 
>>>>> classOf[Row1])
>>>>> 
>>>>>     order20Set.writeAsText("src/main/resources/table1/table1")
>>>>> 
>>>>>     //tableEnv.toAppendStream(order20, 
>>>>> classOf[Row]).writeAsText("/home/jivesh/table")
>>>>>     env.execute("State")
>>>>> 
>>>>>   }
>>>>> 
>>>>>   class Row1 {
>>>>> 
>>>>>     var month: String = _
>>>>> 
>>>>>     var sum: java.lang.Integer = _
>>>>> 
>>>>>     override def toString(): String = month + "," + sum
>>>>> 
>>>>>   }
>>>>> 
>>>>> }
>>>>> Error:-
>>>>> Caused by: 
>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: 
>>>>> Unable to load credentials from service endpoint
>>>>> 
>>>>> Caused by: 
>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No 
>>>>> AWS Credentials provided by BasicAWSCredentialsProvider 
>>>>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider 
>>>>> : org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: 
>>>>> Unable to load credentials from service endpoint
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> 
>>>>> The information contained in this e-mail is confidential and/or 
>>>>> proprietary to Capital One and/or its affiliates and may only be used 
>>>>> solely in performance of work or services for Capital One. The 
>>>>> information transmitted herewith is intended only for use by the 
>>>>> individual or entity to which it is addressed. If the reader of this 
>>>>> message is not the intended recipient, you are hereby notified that any 
>>>>> review, retransmission, dissemination, distribution, copying or other use 
>>>>> of, or taking of any action in reliance upon this information is strictly 
>>>>> prohibited. If you have received this communication in error, please 
>>>>> contact the sender and delete the material from your computer.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> -- 
>>>> Thanks & Regards
>>>> Sri Tummala
>>>> 
>>> 
>>> 
>>> -- 
>>> Thanks & Regards
>>> Sri Tummala
>>> 
>> 
>> 
>> -- 
>> Thanks & Regards
>> Sri Tummala
>> 
> 
> 
> -- 
> Thanks & Regards
> Sri Tummala
> 

Reply via email to