Den 10. mar. 2021 kl. 17.33 skrev sri hari kali
charan Tummala <kali.tumm...@gmail.com>
<mailto:kali.tumm...@gmail.com>:
Flink,
I am able to access Kinesis from Intellij but
not S3 I have edited my stack overflow question
with kinesis code , Flink is still having
issues reading S3.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>
Thanks
Sri
On Tue, Mar 9, 2021 at 11:30 AM sri hari kali
charan Tummala <kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
my stack overflow question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Tue, Mar 9, 2021 at 11:28 AM sri hari
kali charan Tummala <kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
Here is my Intellij question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Mon, Mar 8, 2021 at 11:22 AM sri
hari kali charan Tummala
<kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
Hi Flink Experts,
I am trying to read an S3 file
from my Intellij using Flink I
am.comimg across Aws Auth error
can someone help below are all
the details.
I have Aws credentials in
homefolder/.aws/credentials
My Intellij Environment Variables:-
ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
flink-conf.yaml file content:-
fs.hdfs.hadoopconf:
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
core-site.xml file content:-
<?xml version="1.0"?>
<?xml-stylesheet
type="text/xsl"
href="configuration.xsl"?>
<configuration> <property> <name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property>
<name>fs.s3.buffer.dir</name> <value>/tmp</value>
</property> <property>
<name>fs.s3a.server-side-encryption-algorithm</name>
<value>AES256</value> </property>
<!--<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
</property>--> <property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property>
<name>fs.s3a.access.key</name>
<value></value>
</property> <property> <name>fs.s3a.secret.key</name>
<value></value>
</property> <property>
<name>fs.s3a.session.token</name> <value></value>
</property> <property> <name>fs.s3a.proxy.host</name>
<value></value>
</property> <property> <name>fs.s3a.proxy.port</name>
<value>8099</value> </property> <property>
<name>fs.s3a.proxy.username</name>
<value></value>
</property> <property>
<name>fs.s3a.proxy.password</name> <value></value>
</property> </configuration>
POM.xml file:-
<?xml version="1.0"
encoding="UTF-8"?> <project
xmlns="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>
http://maven.apache.org/xsd/maven-4.0.0.xsd
<http://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin> <!-- see
http://davidb.github.com/scala-maven-plugin
<http://davidb.github.com/scala-maven-plugin>
--> <groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> <executions>
<execution> <goals> <goal>compile</goal>
<goal>testCompile</goal> </goals>
<configuration>
</configuration> </execution>
</executions> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport> <!-- If you
have classpath issue like
NoDefClassError,... --> <!--
useManifestOnlyJar>false</useManifestOnlyJar
--> <includes> <include>**/*Test.*</include>
<include>**/*Suite.*</include> </includes>
</configuration> </plugin> <!--
"package" command plugin -->
<plugin> <artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version> <configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration> <executions>
<execution> <id>make-assembly</id> <phase>package</phase> <goals>
<goal>single</goal> </goals> </execution>
</executions> </plugin>
</plugins> </build>
<dependencies> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId> <version>10.13.1.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.8.0</version> <scope>system</scope>
<systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId> <version>1.11.579</version> </dependency>
<dependency> <groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency>
<dependency> <groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId> <version>2.1</version> </dependency>
<dependency> <groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
--> <dependency> <groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId> <version>1.7</version> </dependency>
<dependency> <groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId> <version>1.4.1</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId> <version>1.11.579</version> </dependency> <!--
For Parquet --> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId> <version>1.1.4-hadoop1</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId> <version>3.6.7</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
--> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>
</dependencies> </project>
Scala Code:-
package com.aws.examples.s3
import
org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.{DataSet,
ExecutionEnvironment}
import org.apache.flink.table.api.{Table,
TableEnvironment}
import
org.apache.flink.table.api.java.BatchTableEnvironment
import
org.apache.flink.table.sources.CsvTableSource
object Batch {
def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment val tableEnv:
BatchTableEnvironment =
TableEnvironment.getTableEnvironment(env)
/* create table from csv */ val
tableSrc = CsvTableSource
.builder()
.path("s3a://bucket/csvfolder/avg.txt")
.fieldDelimiter(",")
.field("date", Types.STRING)
.field("month", Types.STRING)
.field("category", Types.STRING)
.field("product", Types.STRING)
.field("profit", Types.INT)
.build()
tableEnv.registerTableSource("CatalogTable", tableSrc)
val catalog: Table =
tableEnv.scan("CatalogTable")
/* querying with Table API */ val
order20: Table = catalog
.filter(" category === 'Category5'")
.groupBy("month")
.select("month, profit.sum as sum")
.orderBy("sum")
val order20Set: DataSet[Row1] =
tableEnv.toDataSet(order20, classOf[Row1])
order20Set.writeAsText("src/main/resources/table1/table1")
//tableEnv.toAppendStream(order20,
classOf[Row]).writeAsText("/home/jivesh/table")
env.execute("State")
}
class Row1 {
var month:String = _
var sum: java.lang.Integer = _
override def toString():String =month
+"," +sum }
}
Error:-
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from
service endpoint*
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
No AWS Credentials provided by
BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider
:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from
service endpoint*
Thanks
------------------------------------------------------------------------
The information contained in
this e-mail is confidential
and/or proprietary to Capital
One and/or its affiliates and
may only be used solely in
performance of work or services
for Capital One. The
information transmitted
herewith is intended only for
use by the individual or entity
to which it is addressed. If
the reader of this message is
not the intended recipient, you
are hereby notified that any
review, retransmission,
dissemination, distribution,
copying or other use of, or
taking of any action in
reliance upon this information
is strictly prohibited. If you
have received this
communication in error, please
contact the sender and delete
the material from your computer.
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala