Sam,  I just happened to answer a similar question on Stackoverflow at Does 
Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a 
PR to make that (for me) a little clearer on the Apache Flink documentation 
(https://github.com/apache/flink/pull/3054/files).  
|  
|   
|   
|   |    |

   |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project 
that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

    On Tuesday, January 10, 2017 3:17 PM, Samra Kasim 
<samra.ka...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data 
from s3 and 2) to push data to s3. However, I am getting two different errors 
for the projects relating to, i think, how the core-site.xml file is being 
read. I am running the project locally in IntelliJ. I have the environment 
variable in run configurations set to 
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the 
core-site.xml in the src/main/resources folder but get the same errors. I want 
to know if my core-site.xml file is configured correctly for using s3a and how 
to have IntelliJ read the core-site.xml file? Also, are the core-site.xml 
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {     public static voidmain(String[] args) throws 
Exception {        ExecutionEnvironment env 
=ExecutionEnvironment.createLocalEnvironment();       DataSet<String> data = 
env.readTextFile("s3://flink-test/flink-test.txt");        data.print();    }}I 
get the error: Caused by: java.io.IOException: Cannot determine access key to 
Amazon S3. Please make sure to configure it by setting the configuration key 
'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ 
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get 
ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" 
s3a://flink-test/flinktest.txt").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI 
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test, but 
the File System could not be initialized with that address: Unable to load AWS 
credentials from any provider in the chain
This is my core-site.xml:
<configuration>    <property>        <name>fs.defaultFS</name>        
<value>hdfs://localhost:9000</ value>    </property>    <property>        
<name>fs.s3.impl</name>        <value>org.apache.hadoop.fs. 
s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large 
results prior to transmitting them to S3. -->    <property>        
<name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. 
Constants -->    <property>        <name>fs.s3a.awsAccessKeyId</ name>        
<value>*****</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a. 
awsSecretAccessKey</name>        <value>*****</value>    
</property></configuration>This is my pom.xml:<dependencies>    <dependency>    
   <groupId>org.apache.flink</groupId>       
<artifactId>flink-java</artifactId>       <version>1.1.4</version>   
</dependency>     <dependency>       <groupId>org.apache.flink</groupId>        
<artifactId>flink-streaming-java_2.10</artifactId>       
<version>1.1.4</version>   </dependency>     <dependency>       
<groupId>org.apache.flink</groupId>       
<artifactId>flink-clients_2.10</artifactId>       <version>1.1.4</version>   
</dependency>     <dependency>       <groupId>org.apache.flink</groupId>       
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>       
<version>1.1.4</version>   </dependency>     <dependency>       
<groupId>com.amazonaws</groupId>        <artifactId>aws-java-sdk</artifactId>   
    <version>1.7.4</version>   </dependency>     <dependency>       
<groupId>org.apache.hadoop</groupId>       <artifactId>hadoop-aws</artifactId>  
     <version>2.7.2</version>   </dependency>     <dependency>       
<groupId>org.apache.httpcomponents</groupId>       
<artifactId>httpclient</artifactId>       <version>4.2.5</version>   
</dependency>    <dependency>       
<groupId>org.apache.httpcomponents</groupId>       
<artifactId>httpcore</artifactId>       <version>4.2.5</version>   
</dependency></dependencies>
Thanks!Sam

   

Reply via email to