Same error.
On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler <ches...@apache.org> wrote: > From the exception I would conclude that your core-site.xml file is not > being picked up. > > AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so > try setting HADOOP_CONF_DIR to the directory that the file resides in. > > On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote: > > If anyone working have flink version 1.8.1 code reading S3 in Intellij in > public GitHub please pass it on that will be huge help. > > > Thanks > Sri > > On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Which I already did in my pin still its not working. >> >> Thanks >> Sri >> >> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> The concept of plugins does not exist in 1.8.1. As a result it should be >>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to >>> your project. >>> >>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: >>> >>> Let's close this issue guys please answer my questions. I am using Flink >>> 1.8.1. >>> >>> Thanks >>> Sri >>> >>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see >>>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ? >>>> >>>> Thanks >>>> Sri >>>> >>>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> I am not getting what you both are talking about lets be clear. >>>>> >>>>> Plugin ? what is it ? Is it a Jar which I have to download from the >>>>> Internet and place it in a folder ? Is this the Jar which I have to >>>>> download ? (flink-s3-fs-hadoop) ? >>>>> >>>>> Will this belo solution work ? >>>>> >>>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> >>>>> >>>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> Well, you could do this before running the job: >>>>>> >>>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment >>>>>> variable, pointing to a directory containing the plugins >>>>>> >>>>>> PluginManager pluginManager = >>>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>>>>> Filesystem.initialize(new Configuration(), pluginManager); >>>>>> >>>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>>>>> >>>>>> Hi. >>>>>> >>>>>> I had the same problem. Flink use a plugins to access s3. When you >>>>>> run local it starts a mini cluster and the mini cluster don’t load >>>>>> plugins. >>>>>> So it’s not possible without modifying Flink. In my case I wanted to >>>>>> investigate save points through Flink processor API and the workaround >>>>>> was >>>>>> to build my own version of the processor API and include the missing >>>>>> part. >>>>>> >>>>>> Med venlig hilsen / Best regards >>>>>> Lasse Nedergaard >>>>>> >>>>>> >>>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>>>>> <kali.tumm...@gmail.com> <kali.tumm...@gmail.com>: >>>>>> >>>>>> >>>>>> Flink, >>>>>> >>>>>> I am able to access Kinesis from Intellij but not S3 I have edited my >>>>>> stack overflow question with kinesis code , Flink is still having issues >>>>>> reading S3. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>>>>> >>>>>> >>>>>> Thanks >>>>>> Sri >>>>>> >>>>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >>>>>> kali.tumm...@gmail.com> wrote: >>>>>> >>>>>>> my stack overflow question. >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>>> >>>>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>>>>>> kali.tumm...@gmail.com> wrote: >>>>>>> >>>>>>>> Here is my Intellij question. >>>>>>>> >>>>>>>> >>>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>>>> >>>>>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>>>>>>> kali.tumm...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi Flink Experts, >>>>>>>>>> >>>>>>>>> >>>>>>>>> I am trying to read an S3 file from my Intellij using Flink I >>>>>>>>>> am.comimg across Aws Auth error can someone help below are all the >>>>>>>>>> details. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> I have Aws credentials in homefolder/.aws/credentials >>>>>>>>>> >>>>>>>>> >>>>>>>>> My Intellij Environment Variables:- >>>>>>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>>>>>>>> >>>>>>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>>>>>>>> >>>>>>>>>> flink-conf.yaml file content:- >>>>>>>>>> >>>>>>>>>> fs.hdfs.hadoopconf: >>>>>>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>>>>>>>> >>>>>>>>>> core-site.xml file content:- >>>>>>>>>> >>>>>>>>>> <?xml version="1.0"?><?xml-stylesheet type="text/xsl" >>>>>>>>>> href="configuration.xsl"?><configuration> <property> >>>>>>>>>> <name>fs.s3.impl</name> >>>>>>>>>> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> >>>>>>>>>> <property> <name>fs.s3.buffer.dir</name> >>>>>>>>>> <value>/tmp</value> </property> <property> >>>>>>>>>> <name>fs.s3a.server-side-encryption-algorithm</name> >>>>>>>>>> <value>AES256</value> </property> <!--<property> >>>>>>>>>> <name>fs.s3a.aws.credentials.provider</name> >>>>>>>>>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value> >>>>>>>>>> </property>--> <property> >>>>>>>>>> <name>fs.s3a.aws.credentials.provider</name> >>>>>>>>>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> >>>>>>>>>> </property> <property> <name>fs.s3a.access.key</name> >>>>>>>>>> <value></value> </property> <property> >>>>>>>>>> <name>fs.s3a.secret.key</name> <value></value> </property> >>>>>>>>>> <property> <name>fs.s3a.session.token</name> >>>>>>>>>> <value></value> </property> <property> >>>>>>>>>> <name>fs.s3a.proxy.host</name> <value></value> </property> >>>>>>>>>> <property> <name>fs.s3a.proxy.port</name> >>>>>>>>>> <value>8099</value> </property> <property> >>>>>>>>>> <name>fs.s3a.proxy.username</name> <value></value> >>>>>>>>>> </property> <property> <name>fs.s3a.proxy.password</name> >>>>>>>>>> <value></value> </property></configuration> >>>>>>>>>> >>>>>>>>>> POM.xml file:- >>>>>>>>>> >>>>>>>>>> <?xml version="1.0" encoding="UTF-8"?><project >>>>>>>>>> xmlns="http://maven.apache.org/POM/4.0.0" >>>>>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>>>>>>>> <modelVersion>4.0.0</modelVersion> >>>>>>>>>> <groupId>FlinkStreamAndSql</groupId> >>>>>>>>>> <artifactId>FlinkStreamAndSql</artifactId> >>>>>>>>>> <version>1.0-SNAPSHOT</version> <build> >>>>>>>>>> <sourceDirectory>src/main/scala</sourceDirectory> <plugins> >>>>>>>>>> <plugin> <!-- see >>>>>>>>>> http://davidb.github.com/scala-maven-plugin --> >>>>>>>>>> <groupId>net.alchim31.maven</groupId> >>>>>>>>>> <artifactId>scala-maven-plugin</artifactId> >>>>>>>>>> <version>3.1.3</version> <executions> >>>>>>>>>> <execution> <goals> >>>>>>>>>> <goal>compile</goal> >>>>>>>>>> <goal>testCompile</goal> </goals> >>>>>>>>>> <configuration> </configuration> >>>>>>>>>> </execution> </executions> >>>>>>>>>> </plugin> <plugin> >>>>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>>>> <artifactId>maven-surefire-plugin</artifactId> >>>>>>>>>> <version>2.13</version> <configuration> >>>>>>>>>> <useFile>false</useFile> >>>>>>>>>> <disableXmlReport>true</disableXmlReport> <!-- If >>>>>>>>>> you have classpath issue like NoDefClassError,... --> >>>>>>>>>> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> >>>>>>>>>> <includes> >>>>>>>>>> <include>**/*Test.*</include> >>>>>>>>>> <include>**/*Suite.*</include> </includes> >>>>>>>>>> </configuration> </plugin> <!-- >>>>>>>>>> "package" command plugin --> <plugin> >>>>>>>>>> <artifactId>maven-assembly-plugin</artifactId> >>>>>>>>>> <version>2.4.1</version> <configuration> >>>>>>>>>> <descriptorRefs> >>>>>>>>>> <descriptorRef>jar-with-dependencies</descriptorRef> >>>>>>>>>> </descriptorRefs> </configuration> >>>>>>>>>> <executions> <execution> >>>>>>>>>> <id>make-assembly</id> <phase>package</phase> >>>>>>>>>> <goals> >>>>>>>>>> <goal>single</goal> </goals> >>>>>>>>>> </execution> </executions> </plugin> >>>>>>>>>> </plugins> </build> <dependencies> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-core</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-core</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-clients_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.derby</groupId> >>>>>>>>>> <artifactId>derby</artifactId> >>>>>>>>>> <version>10.13.1.1</version> </dependency> >>>>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-jdbc_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-table-api-scala_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-table-api-java</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-table</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-json</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-scala_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-scala_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> >>>>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>flink-connector-kinesis_2.11</artifactId> >>>>>>>>>> <version>1.8.0</version> <scope>system</scope> >>>>>>>>>> >>>>>>>>>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> >>>>>>>>>> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> >>>>>>>>>> <dependency> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>amazon-kinesis-client</artifactId> >>>>>>>>>> <version>1.8.8</version> </dependency> >>>>>>>>>> <dependency> >>>>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>aws-java-sdk-kinesis</artifactId> >>>>>>>>>> <version>1.11.579</version> </dependency> >>>>>>>>>> <dependency> <groupId>commons-dbcp</groupId> >>>>>>>>>> <artifactId>commons-dbcp</artifactId> >>>>>>>>>> <version>1.2.2</version> </dependency> >>>>>>>>>> <dependency> >>>>>>>>>> <groupId>com.google.code.gson</groupId> >>>>>>>>>> <artifactId>gson</artifactId> >>>>>>>>>> <version>2.1</version> </dependency> >>>>>>>>>> <dependency> <groupId>commons-cli</groupId> >>>>>>>>>> <artifactId>commons-cli</artifactId> >>>>>>>>>> <version>1.4</version> </dependency> >>>>>>>>>> <!-- >>>>>>>>>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv >>>>>>>>>> --> <dependency> >>>>>>>>>> <groupId>org.apache.commons</groupId> >>>>>>>>>> <artifactId>commons-csv</artifactId> >>>>>>>>>> <version>1.7</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.commons</groupId> >>>>>>>>>> <artifactId>commons-compress</artifactId> >>>>>>>>>> <version>1.4.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>>>>>>>> <version>1.4.0</version> </dependency> <dependency> >>>>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>>>>>>>> <version>1.4.0</version> </dependency> <dependency> >>>>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>aws-java-sdk</artifactId> >>>>>>>>>> <version>1.11.579</version> </dependency> <!-- For >>>>>>>>>> Parquet --> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-hadoop-compatibility_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-avro</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.parquet</groupId> >>>>>>>>>> <artifactId>parquet-avro</artifactId> >>>>>>>>>> <version>1.10.0</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> <artifactId>hadoop-mapreduce-client-core</artifactId> >>>>>>>>>> <version>3.1.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-connector-twitter_2.10</artifactId> >>>>>>>>>> <version>1.1.4-hadoop1</version> </dependency> >>>>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.json4s</groupId> >>>>>>>>>> <artifactId>json4s-jackson_2.11</artifactId> >>>>>>>>>> <version>3.6.7</version> </dependency> <dependency> >>>>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>>>> <artifactId>aws-java-sdk-cloudsearch</artifactId> >>>>>>>>>> <version>1.11.500</version> </dependency> <!-- >>>>>>>>>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 >>>>>>>>>> --> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-shaded-hadoop2</artifactId> >>>>>>>>>> <version>2.8.3-1.8.3</version> </dependency> >>>>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-s3-fs-hadoop</artifactId> >>>>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> <artifactId>hadoop-common</artifactId> >>>>>>>>>> <version>2.8.5</version> </dependency> >>>>>>>>>> </dependencies></project> >>>>>>>>>> >>>>>>>>>> Scala Code:- >>>>>>>>>> >>>>>>>>>> package com.aws.examples.s3 >>>>>>>>>> >>>>>>>>>> import org.apache.flink.api.common.typeinfo.Typesimport >>>>>>>>>> org.apache.flink.api.java.{DataSet, ExecutionEnvironment}import >>>>>>>>>> org.apache.flink.table.api.{Table, TableEnvironment}import >>>>>>>>>> org.apache.flink.table.api.java.BatchTableEnvironmentimport >>>>>>>>>> org.apache.flink.table.sources.CsvTableSource >>>>>>>>>> object Batch { >>>>>>>>>> >>>>>>>>>> def main(args: Array[String]): Unit = { >>>>>>>>>> >>>>>>>>>> val env: ExecutionEnvironment = >>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment val tableEnv: >>>>>>>>>> BatchTableEnvironment = >>>>>>>>>> TableEnvironment.getTableEnvironment(env) >>>>>>>>>> /* create table from csv */ val tableSrc = CsvTableSource >>>>>>>>>> .builder() >>>>>>>>>> .path("s3a://bucket/csvfolder/avg.txt") >>>>>>>>>> .fieldDelimiter(",") >>>>>>>>>> .field("date", Types.STRING) >>>>>>>>>> .field("month", Types.STRING) >>>>>>>>>> .field("category", Types.STRING) >>>>>>>>>> .field("product", Types.STRING) >>>>>>>>>> .field("profit", Types.INT) >>>>>>>>>> .build() >>>>>>>>>> >>>>>>>>>> tableEnv.registerTableSource("CatalogTable", tableSrc) >>>>>>>>>> >>>>>>>>>> val catalog: Table = tableEnv.scan("CatalogTable") >>>>>>>>>> /* querying with Table API */ val order20: Table = catalog >>>>>>>>>> .filter(" category === 'Category5'") >>>>>>>>>> .groupBy("month") >>>>>>>>>> .select("month, profit.sum as sum") >>>>>>>>>> .orderBy("sum") >>>>>>>>>> >>>>>>>>>> val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, >>>>>>>>>> classOf[Row1]) >>>>>>>>>> >>>>>>>>>> order20Set.writeAsText("src/main/resources/table1/table1") >>>>>>>>>> >>>>>>>>>> //tableEnv.toAppendStream(order20, >>>>>>>>>> classOf[Row]).writeAsText("/home/jivesh/table") >>>>>>>>>> env.execute("State") >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> class Row1 { >>>>>>>>>> >>>>>>>>>> var month: String = _ >>>>>>>>>> >>>>>>>>>> var sum: java.lang.Integer = _ >>>>>>>>>> >>>>>>>>>> override def toString(): String = month + "," + sum } >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Error:- >>>>>>>>>> *Caused by: >>>>>>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: >>>>>>>>>> Unable >>>>>>>>>> to load credentials from service endpoint* >>>>>>>>>> >>>>>>>>>> *Caused by: >>>>>>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: >>>>>>>>>> * >>>>>>>>>> >>>>>>>>> -- Thanks & Regards Sri Tummala