From the exception I would conclude that your core-site.xml file is not being picked up.

AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so try setting HADOOP_CONF_DIR to the directory that the file resides in.

On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:
If anyone working have flink version 1.8.1 code reading S3 in Intellij in public GitHub please pass it on that will be huge help.


Thanks
Sri

On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

    Which I already did in my pin still its not working.

    Thanks
    Sri

    On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler <ches...@apache.org
    <mailto:ches...@apache.org>> wrote:

        The concept of plugins does not exist in 1.8.1. As a result it
        should be sufficient for your use-case to add a dependency on
        flink-s3-fs-hadoop to your project.

        On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
        Let's close this issue guys please answer my questions. I am
        using Flink 1.8.1.

        Thanks
        Sri

        On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala
        <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

            Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I
            only see ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

            Thanks
            Sri

            On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan
            Tummala <kali.tumm...@gmail.com
            <mailto:kali.tumm...@gmail.com>> wrote:

                I am not getting what you both are talking about lets
                be clear.

                Plugin ? what is it ? Is it a Jar which I have to
                download from the Internet and place it in a folder ?
                Is this the Jar which I have to download ?
                (flink-s3-fs-hadoop) ?

                Will this belo solution work ?
                
https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
                
<https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being>


                Thanks
                Sri



                On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler
                <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                    Well, you could do this before running the job:

                    // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR
                    environment variable, pointing to a directory
                    containing the plugins

                    PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(new
                    Configuration());
                    Filesystem.initialize(new Configuration(),
                    pluginManager);

                    On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
                    Hi.

                    I had the same problem. Flink use a plugins to
                    access s3. When you run local it starts a mini
                    cluster and the mini cluster don’t load plugins.
                    So it’s not possible without modifying Flink. 
                    In my case I wanted to investigate save points
                    through Flink processor API and the workaround
                    was to build my own version of the processor API
                    and include the missing part.

                    Med venlig hilsen / Best regards
                    Lasse Nedergaard


                    Den 10. mar. 2021 kl. 17.33 skrev sri hari kali
                    charan Tummala <kali.tumm...@gmail.com>
                    <mailto:kali.tumm...@gmail.com>:

                    
                    Flink,

                    I am able to access Kinesis from Intellij but
                    not S3 I have edited my stack overflow question
                    with kinesis code , Flink is still having
                    issues reading S3.

                    
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
                    
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>


                    Thanks
                    Sri

                    On Tue, Mar 9, 2021 at 11:30 AM sri hari kali
                    charan Tummala <kali.tumm...@gmail.com
                    <mailto:kali.tumm...@gmail.com>> wrote:

                        my stack overflow question.

                        
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
                        
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

                        On Tue, Mar 9, 2021 at 11:28 AM sri hari
                        kali charan Tummala <kali.tumm...@gmail.com
                        <mailto:kali.tumm...@gmail.com>> wrote:

                            Here is my Intellij question.

                            
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
                            
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

                            On Mon, Mar 8, 2021 at 11:22 AM sri
                            hari kali charan Tummala
                            <kali.tumm...@gmail.com
                            <mailto:kali.tumm...@gmail.com>> wrote:


                                    Hi Flink Experts,


                                    I am trying to read an S3 file
                                    from my Intellij using Flink I
                                    am.comimg across Aws Auth error
                                    can someone help below are all
                                    the details.

                                    I have Aws credentials in
                                    homefolder/.aws/credentials


                                    My Intellij Environment Variables:-
                                    
ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
                                    
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config

                                    flink-conf.yaml file content:-

                                    fs.hdfs.hadoopconf: 
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config

                                    core-site.xml file content:-

                                    <?xml version="1.0"?>
                                    <?xml-stylesheet
                                    type="text/xsl"
                                    href="configuration.xsl"?>
                                    <configuration> <property> <name>fs.s3.impl</name> 
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property>
                                    <name>fs.s3.buffer.dir</name> <value>/tmp</value> 
</property> <property>
                                    <name>fs.s3a.server-side-encryption-algorithm</name> 
<value>AES256</value> </property>
                                    <!--<property>
                                    <name>fs.s3a.aws.credentials.provider</name>
                                    
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
                                    </property>--> <property> 
<name>fs.s3a.aws.credentials.provider</name> 
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property>
                                    <name>fs.s3a.access.key</name> 
<value></value>
                                    </property> <property> <name>fs.s3a.secret.key</name> 
<value></value>
                                    </property> <property> 
<name>fs.s3a.session.token</name> <value></value>
                                    </property> <property> <name>fs.s3a.proxy.host</name> 
<value></value>
                                    </property> <property> <name>fs.s3a.proxy.port</name> 
<value>8099</value> </property> <property>
                                    <name>fs.s3a.proxy.username</name> 
<value></value>
                                    </property> <property> 
<name>fs.s3a.proxy.password</name> <value></value>
                                    </property> </configuration>

                                    POM.xml file:-

                                    <?xml version="1.0"
                                    encoding="UTF-8"?> <project
                                    xmlns="http://maven.apache.org/POM/4.0.0
                                    <http://maven.apache.org/POM/4.0.0>"
                                    
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
                                    <http://www.w3.org/2001/XMLSchema-instance>"
                                    
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                                    <http://maven.apache.org/POM/4.0.0>
                                    http://maven.apache.org/xsd/maven-4.0.0.xsd
                                    
<http://maven.apache.org/xsd/maven-4.0.0.xsd>">
                                    <modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId> 
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build>
                                    <sourceDirectory>src/main/scala</sourceDirectory> 
<plugins>
                                    <plugin> <!-- see
                                    http://davidb.github.com/scala-maven-plugin
                                    
<http://davidb.github.com/scala-maven-plugin>
                                    --> <groupId>net.alchim31.maven</groupId> 
<artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> <executions>
                                    <execution> <goals> <goal>compile</goal> 
<goal>testCompile</goal> </goals>
                                    <configuration>
                                    </configuration> </execution>
                                    </executions> </plugin>
                                    <plugin> <groupId>org.apache.maven.plugins</groupId> 
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration>
                                    <useFile>false</useFile> 
<disableXmlReport>true</disableXmlReport> <!-- If you
                                    have classpath issue like
                                    NoDefClassError,... --> <!--
                                    useManifestOnlyJar>false</useManifestOnlyJar
                                    --> <includes> <include>**/*Test.*</include> 
<include>**/*Suite.*</include> </includes>
                                    </configuration> </plugin> <!--
                                    "package" command plugin -->
                                    <plugin> <artifactId>maven-assembly-plugin</artifactId> 
<version>2.4.1</version> <configuration>
                                    <descriptorRefs> 
<descriptorRef>jar-with-dependencies</descriptorRef>
                                    </descriptorRefs>
                                    </configuration> <executions>
                                    <execution> <id>make-assembly</id> <phase>package</phase> <goals> 
<goal>single</goal> </goals> </execution>
                                    </executions> </plugin>
                                    </plugins> </build>
                                    <dependencies> <dependency>
                                    <groupId>org.apache.flink</groupId> 
<artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.derby</groupId> 
<artifactId>derby</artifactId> <version>10.13.1.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-api-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-table</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-json</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.8.0</version> <scope>system</scope> 
<systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>aws-java-sdk-kinesis</artifactId> <version>1.11.579</version> </dependency>
                                    <dependency> <groupId>commons-dbcp</groupId> 
<artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency>
                                    <dependency> <groupId>com.google.code.gson</groupId> 
<artifactId>gson</artifactId> <version>2.1</version> </dependency>
                                    <dependency> <groupId>commons-cli</groupId> 
<artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <!--
                                    
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
                                    
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
                                    --> <dependency> <groupId>org.apache.commons</groupId> 
<artifactId>commons-csv</artifactId> <version>1.7</version> </dependency>
                                    <dependency> <groupId>org.apache.commons</groupId> 
<artifactId>commons-compress</artifactId> <version>1.4.1</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>aws-java-sdk</artifactId> <version>1.11.579</version> </dependency> <!--
                                    For Parquet --> <dependency>
                                    <groupId>org.apache.flink</groupId> 
<artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> 
</dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-avro</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.parquet</groupId> 
<artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>
                                    <dependency> <groupId>org.apache.hadoop</groupId> 
<artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-twitter_2.10</artifactId> <version>1.1.4-hadoop1</version> 
</dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.json4s</groupId> 
<artifactId>json4s-jackson_2.11</artifactId> <version>3.6.7</version> </dependency>
                                    <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency> <!--
                                    
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
                                    
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
                                    --> <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency>
                                    <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency>
                                    <dependency> <groupId>org.apache.hadoop</groupId> 
<artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>
                                    </dependencies> </project>

                                    Scala Code:-

                                    package com.aws.examples.s3


                                    import 
org.apache.flink.api.common.typeinfo.Types
                                    import org.apache.flink.api.java.{DataSet, 
ExecutionEnvironment}
                                    import org.apache.flink.table.api.{Table, 
TableEnvironment}
                                    import 
org.apache.flink.table.api.java.BatchTableEnvironment
                                    import 
org.apache.flink.table.sources.CsvTableSource

                                    object Batch {

                                       def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
                                           
ExecutionEnvironment.getExecutionEnvironment val tableEnv: 
BatchTableEnvironment =
                                           
TableEnvironment.getTableEnvironment(env)
                                         /* create table from csv */ val 
tableSrc = CsvTableSource
                                           .builder()
                                           
.path("s3a://bucket/csvfolder/avg.txt")
                                           .fieldDelimiter(",")
                                           .field("date", Types.STRING)
                                           .field("month", Types.STRING)
                                           .field("category", Types.STRING)
                                           .field("product", Types.STRING)
                                           .field("profit", Types.INT)
                                           .build()

                                         
tableEnv.registerTableSource("CatalogTable", tableSrc)

                                         val catalog: Table = 
tableEnv.scan("CatalogTable")
                                         /* querying with Table API */ val 
order20: Table = catalog
                                           .filter(" category === 'Category5'")
                                           .groupBy("month")
                                           .select("month, profit.sum as sum")
                                           .orderBy("sum")

                                         val order20Set: DataSet[Row1] = 
tableEnv.toDataSet(order20, classOf[Row1])

                                         
order20Set.writeAsText("src/main/resources/table1/table1")

                                         //tableEnv.toAppendStream(order20,
                                    
classOf[Row]).writeAsText("/home/jivesh/table")
                                    env.execute("State")

                                       }

                                       class Row1 {

                                         var month:String = _

                                         var sum: java.lang.Integer = _

                                         override def toString():String =month 
+"," +sum }

                                    }

                                    Error:-
                                    *Caused by:
                                    
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                                    Unable to load credentials from
                                    service endpoint*

                                    *Caused by:
                                    
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
                                    No AWS Credentials provided by
                                    BasicAWSCredentialsProvider
                                    EnvironmentVariableCredentialsProvider
                                    InstanceProfileCredentialsProvider
                                    :
                                    
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                                    Unable to load credentials from
                                    service endpoint*


                                    Thanks

                                    
------------------------------------------------------------------------

                                    The information contained in
                                    this e-mail is confidential
                                    and/or proprietary to Capital
                                    One and/or its affiliates and
                                    may only be used solely in
                                    performance of work or services
                                    for Capital One. The
                                    information transmitted
                                    herewith is intended only for
                                    use by the individual or entity
                                    to which it is addressed. If
                                    the reader of this message is
                                    not the intended recipient, you
                                    are hereby notified that any
                                    review, retransmission,
                                    dissemination, distribution,
                                    copying or other use of, or
                                    taking of any action in
                                    reliance upon this information
                                    is strictly prohibited. If you
                                    have received this
                                    communication in error, please
                                    contact the sender and delete
                                    the material from your computer.




-- Thanks & Regards
                                Sri Tummala



-- Thanks & Regards
                            Sri Tummala



-- Thanks & Regards
                        Sri Tummala



-- Thanks & Regards
                    Sri Tummala




-- Thanks & Regards
                Sri Tummala



-- Thanks & Regards
            Sri Tummala

-- Thanks & Regards
        Sri Tummala


-- Thanks & Regards
    Sri Tummala

--
Thanks & Regards
Sri Tummala


Reply via email to