Re: Struggling with reading the file from s3 as Source
My problem was the plugins jar needs to be under plugins/s3-fs-hadoop. Running code with Added to flink-conf.yaml: s3.access-key: s3.secret-key: Removed from pom.xml all hadoop dependencies. cd / /bin/start-cluster.sh ./bin/flink runxyz..jar Still struggling with how to get it work with pom.xml in IntelliJ IDEA On Mon, Sep 14, 2020 at 12:13 PM Vijay Balakrishnan wrote: > Hi Robert, > Thanks for the link. > Is there a simple example I can use as a starting template for using S3 > with pom.xml ? > > I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop > directory > Running from flink-1.11.1/ > flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar > /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar > > Caused by: java.io.IOException: *Cannot find any jar files for plugin in > directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. * > Please provide the jar files for the plugin or delete the directory. > at > org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97) > > *IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the > plugins/s3-fs-hadoop directory) > *How do I connect that to the pom.xml to run inside IntelliJ which points > to the Apache repo??* > pom.xml: > Added hadoop dependencies: > > > > org.apache.hadoop > hadoop-client > ${hadoop.version} > > > org.apache.hadoop > hadoop-aws > ${hadoop.version} > > > > org.apache.flink > > flink-streaming-java_${scala.binary.version} > ${flink.version} > > > org.apache.flink > > flink-connector-kinesis_${scala.binary.version} > ${flink.version} > > > This gives: > Exception in thread "main" java.lang.IllegalStateException: *No > ExecutorFactory found to execute the application.* > at > org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) > > TIA, > > On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger > wrote: > >> Hi Vijay, >> >> Can you post the error you are referring to? >> Did you properly set up an s3 plugin ( >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) >> ? >> >> On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan >> wrote: >> >>> Hi, >>> >>> I want to *get data from S3 and process and send to Kinesis.* >>> 1. Get gzip files from an s3 folder(s3://bucket/prefix) >>> 2. Sort each file >>> 3. Do some map/processing on each record in the file >>> 4. send to Kinesis >>> >>> Idea is: >>> env.readTextFile(s3Folder) >>> .sort(SortFunction) >>> .map(MapFunction) >>> .sink(KinesisSink) >>> >>> Struggling with reading the file from s3. >>> //Assume env is setup properly >>> //The endpoint can either be a single file or a directory - >>> "s3:///" >>> final DataStreamSource stringDataStreamSource = env. >>> readTextFile(s3Folder); >>> stringDataStreamSource.print(); >>> >>> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't >>> want anything to do with HDFS. >>> Just want to read from S3. >>> Saw a StackOverflow mention by David Anderson I think about using the >>> Flink SQL API. >>> I would appreciate any decent example to get the reading from S3 working. >>> >>> TIA, >>> Vijay >>> >>>
Re: Struggling with reading the file from s3 as Source
Hi Robert, Thanks for the link. Is there a simple example I can use as a starting template for using S3 with pom.xml ? I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory Running from flink-1.11.1/ flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar Caused by: java.io.IOException: *Cannot find any jar files for plugin in directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. * Please provide the jar files for the plugin or delete the directory. at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97) *IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop directory) *How do I connect that to the pom.xml to run inside IntelliJ which points to the Apache repo??* pom.xml: Added hadoop dependencies: org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-aws ${hadoop.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kinesis_${scala.binary.version} ${flink.version} This gives: Exception in thread "main" java.lang.IllegalStateException: *No ExecutorFactory found to execute the application.* at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) TIA, On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger wrote: > Hi Vijay, > > Can you post the error you are referring to? > Did you properly set up an s3 plugin ( > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ? > > On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan > wrote: > >> Hi, >> >> I want to *get data from S3 and process and send to Kinesis.* >> 1. Get gzip files from an s3 folder(s3://bucket/prefix) >> 2. Sort each file >> 3. Do some map/processing on each record in the file >> 4. send to Kinesis >> >> Idea is: >> env.readTextFile(s3Folder) >> .sort(SortFunction) >> .map(MapFunction) >> .sink(KinesisSink) >> >> Struggling with reading the file from s3. >> //Assume env is setup properly >> //The endpoint can either be a single file or a directory - >> "s3:///" >> final DataStreamSource stringDataStreamSource = env. >> readTextFile(s3Folder); >> stringDataStreamSource.print(); >> >> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't >> want anything to do with HDFS. >> Just want to read from S3. >> Saw a StackOverflow mention by David Anderson I think about using the >> Flink SQL API. >> I would appreciate any decent example to get the reading from S3 working. >> >> TIA, >> Vijay >> >>
Re: Struggling with reading the file from s3 as Source
Hi Vijay, Can you post the error you are referring to? Did you properly set up an s3 plugin ( https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ? On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan wrote: > Hi, > > I want to *get data from S3 and process and send to Kinesis.* > 1. Get gzip files from an s3 folder(s3://bucket/prefix) > 2. Sort each file > 3. Do some map/processing on each record in the file > 4. send to Kinesis > > Idea is: > env.readTextFile(s3Folder) > .sort(SortFunction) > .map(MapFunction) > .sink(KinesisSink) > > Struggling with reading the file from s3. > //Assume env is setup properly > //The endpoint can either be a single file or a directory - > "s3:///" > final DataStreamSource stringDataStreamSource = env. > readTextFile(s3Folder); > stringDataStreamSource.print(); > > It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't > want anything to do with HDFS. > Just want to read from S3. > Saw a StackOverflow mention by David Anderson I think about using the > Flink SQL API. > I would appreciate any decent example to get the reading from S3 working. > > TIA, > Vijay > >
Struggling with reading the file from s3 as Source
Hi, I want to *get data from S3 and process and send to Kinesis.* 1. Get gzip files from an s3 folder(s3://bucket/prefix) 2. Sort each file 3. Do some map/processing on each record in the file 4. send to Kinesis Idea is: env.readTextFile(s3Folder) .sort(SortFunction) .map(MapFunction) .sink(KinesisSink) Struggling with reading the file from s3. //Assume env is setup properly //The endpoint can either be a single file or a directory - "s3:///" final DataStreamSource stringDataStreamSource = env. readTextFile(s3Folder); stringDataStreamSource.print(); It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't want anything to do with HDFS. Just want to read from S3. Saw a StackOverflow mention by David Anderson I think about using the Flink SQL API. I would appreciate any decent example to get the reading from S3 working. TIA, Vijay