Re: Using S3 as state backend

2015-12-16 Thread Thomas Götzinger
Hi Brian,

thanks, that helped me a lot.



2015-12-15 16:52 GMT+01:00 Brian Chhun <brian.ch...@getbraintree.com>:

> Sure, excuse me if anything was obvious or wrong, I know next to nothing
> about Hadoop.
>
> 1. get the Hadoop 2.7 distribution (I set its path to HADOOP_HOME to make
> things easier for mysellf)
> 2. set the HADOOP_CLASSPATH to include
> ${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/tools/lib/*
> (you may not need all those paths?)
> 3. stick this into $HADOOP_HOME/etc/hadoop/core-site.xml
>
> 
>   
> fs.defaultFS
> s3a://YOUR-BUCKET
>   
>   
> fs.s3a.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
>   
> 
>
> 4. stick this into your flink-conf
>
> fs.hdfs.hadoopconf: $HADOOP_HOME/etc/hadoop
> recovery.mode: zookeeper
> recovery.zookeeper.quorum: whatever01.local:2181
> recovery.zookeeper.path.root: /whatever
> state.backend: filesystem
> state.backend.fs.checkpointdir: s3a:///YOUR-BUCKET/checkpoints
> recovery.zookeeper.storageDir: s3a:///YOUR-BUCKET/recovery
>
> That's all I had to do in the Flink side. obvs in the AWS side, I had my
> IAM role setup with readlwrite access to the bucket.
>
> Thanks,
> Brian
>
> On Mon, Dec 14, 2015 at 10:39 PM, Thomas Götzinger <m...@simplydevelop.de>
> wrote:
>
>> Hi Brian
>>
>> Can you give me short summary how to achieve this.
>> Am 14.12.2015 23:20 schrieb "Brian Chhun" <brian.ch...@getbraintree.com>:
>>
>>> For anyone else looking, I was able to use the s3a filesystem which can
>>> use IAM role based authentication as provided by the underlying AWS client
>>> library.
>>>
>>> Thanks,
>>> Brian
>>>
>>> On Thu, Dec 10, 2015 at 4:28 PM, Brian Chhun <
>>> brian.ch...@getbraintree.com> wrote:
>>>
>>>> Thanks Ufuk, this did the trick.
>>>>
>>>> Thanks,
>>>> Brian
>>>>
>>>> On Wed, Dec 9, 2015 at 4:37 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>>> Hey Brian,
>>>>>
>>>>> did you follow the S3 setup guide?
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html
>>>>>
>>>>> You have to set the fs.hdfs.hadoopconf property and add
>>>>>
>>>>> 
>>>>> fs.s3.impl
>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem
>>>>> 
>>>>>
>>>>> to core-site.xml
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> > On 09 Dec 2015, at 20:50, Brian Chhun <brian.ch...@getbraintree.com>
>>>>> wrote:
>>>>> >
>>>>> > Hello,
>>>>> >
>>>>> > I'm trying to setup an HA cluster and I'm running into issues using
>>>>> S3 as the state backend. This is raised during startup:
>>>>> >
>>>>> > 2015-12-09T19:23:36.430724+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: java.io.IOException: No file system
>>>>> found with scheme s3, referenced in file URI 's3:///flink/recovery/blob'.
>>>>> >
>>>>> > 2015-12-09T19:23:36.430858+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:242)
>>>>> >
>>>>> > 2015-12-09T19:23:36.430989+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore.(FileSystemBlobStore.java:67)
>>>>> >
>>>>> > 2015-12-09T19:23:36.431297+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:105)
>>>>> >
>>>>> > 2015-12-09T19:23:36.431435+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1814)
>>>>> >
>>>>> > 2015-12-09T19:23:36.431569+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1944)
>>>>> >
>>>>> > 2015-12-09T19:23:36.431690+00:00 i-1ec317c4
>>>>> docker/jobmanager01-d3174d6[1207]: #011at
>>>>> org.apache.flink.

Re: Using S3 as state backend

2015-12-14 Thread Thomas Götzinger
Hi Brian

Can you give me short summary how to achieve this.
Am 14.12.2015 23:20 schrieb "Brian Chhun" :

> For anyone else looking, I was able to use the s3a filesystem which can
> use IAM role based authentication as provided by the underlying AWS client
> library.
>
> Thanks,
> Brian
>
> On Thu, Dec 10, 2015 at 4:28 PM, Brian Chhun  > wrote:
>
>> Thanks Ufuk, this did the trick.
>>
>> Thanks,
>> Brian
>>
>> On Wed, Dec 9, 2015 at 4:37 PM, Ufuk Celebi  wrote:
>>
>>> Hey Brian,
>>>
>>> did you follow the S3 setup guide?
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html
>>>
>>> You have to set the fs.hdfs.hadoopconf property and add
>>>
>>> 
>>> fs.s3.impl
>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem
>>> 
>>>
>>> to core-site.xml
>>>
>>> – Ufuk
>>>
>>> > On 09 Dec 2015, at 20:50, Brian Chhun 
>>> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I'm trying to setup an HA cluster and I'm running into issues using S3
>>> as the state backend. This is raised during startup:
>>> >
>>> > 2015-12-09T19:23:36.430724+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: java.io.IOException: No file system
>>> found with scheme s3, referenced in file URI 's3:///flink/recovery/blob'.
>>> >
>>> > 2015-12-09T19:23:36.430858+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:242)
>>> >
>>> > 2015-12-09T19:23:36.430989+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.blob.FileSystemBlobStore.(FileSystemBlobStore.java:67)
>>> >
>>> > 2015-12-09T19:23:36.431297+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:105)
>>> >
>>> > 2015-12-09T19:23:36.431435+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1814)
>>> >
>>> > 2015-12-09T19:23:36.431569+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1944)
>>> >
>>> > 2015-12-09T19:23:36.431690+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1898)
>>> >
>>> > 2015-12-09T19:23:36.431810+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:1584)
>>> >
>>> > 2015-12-09T19:23:36.431933+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1486)
>>> >
>>> > 2015-12-09T19:23:36.432414+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1447)
>>> >
>>> > 2015-12-09T19:23:36.432649+00:00 i-1ec317c4
>>> docker/jobmanager01-d3174d6[1207]: #011at
>>> org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
>>> >
>>> > Is it possible to use S3 as the backend store or is only hdfs/mapfs
>>> supported?
>>> >
>>> >
>>> > Thanks,
>>> > Brian
>>>
>>>
>>
>


Re: Flink on EC"

2015-11-08 Thread Thomas Götzinger
Sorry for Confusing,

the flink cluster throws following stack trace..

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Failed to submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink 
Java Job at Sun Nov 08 18:50:52 UTC 2015)
at org.apache.flink.client.program.Client.run(Client.java:413)
at org.apache.flink.client.program.Client.run(Client.java:356)
at org.apache.flink.client.program.Client.run(Client.java:349)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at de.fraunhofer.iese.proopt.Template.run(Template.java:112)
at de.fraunhofer.iese.proopt.Main.main(Main.java:8)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink Java Job at Sun Nov 08 
18:50:52 UTC 2015)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: No file system found with scheme s3n, referenced in file URI 
's3n://big-data-benchmark/pavlo/text/tiny/rankings'.
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:162)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
... 19 more
Caused by: java.io.IOException: No file system found with scheme s3n, 
referenced in file URI 's3n://big-data-benchmark/pavlo/text/tiny/rankings'.
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:247)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:447)
at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:146)
... 21 more

-- 
Viele Grüße

 
Thomas Götzinger

Freiberuflicher Informatiker

 
Glockenstraße 2a

D-66882 Hütschenhausen OT Spesbach

Mobil: +49 (0)176

Re: Flink on EC"

2015-11-08 Thread Thomas Götzinger
HI Fabian,

thanks for reply. I use a karamel receipt to install flink on ec2.Currently I 
am using flink-0.9.1-bin-hadoop24.tgz 
<http://apache.mirrors.spacedump.net/flink/flink-0.9.1/flink-0.9.1-bin-hadoop24.tgz>.

 In that file the NativeS3FileSystem is included. First I’ve tried it with the 
standard karamel receipt on github hopshadoop/flink-chef 
<https://github.com/hopshadoop/flink-chef> but it’s on Version 0.9.0 and the 
S3NFileSystem is not included.
So I forked the github project by goetzingert/flink-chef
Although the class file is include the application throws a 
ClassNotFoundException for the class above.
In my Project I add the conf/core-site.xml

  
fs.s3n.impl
org.apache.hadoop.fs.s3native.NativeS3FileSystem
  
  
fs.s3n.awsAccessKeyId
….
  
  
fs.s3n.awsSecretAccessKey
...
  

— 
I also tried to use the programmatic configuration 

XMLConfiguration config = new XMLConfiguration(configPath);

env = ExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = 
GlobalConfiguration.getConfiguration();
configuration.setString("fs.s3.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
configuration.setString("fs.s3n.awsAccessKeyId", “..");
configuration.setString("fs.s3n.awsSecretAccessKey”,”../");

configuration.setString("fs.hdfs.hdfssite",Template.class.getResource("/conf/core-site.xml").toString());
GlobalConfiguration.includeConfiguration(configuration);


Any Idea why the class is not included in classpath? Is there another script to 
setup flink on ec2 cluster?

When will flink 0.10 be released? 



Regards

 
Thomas Götzinger

Freiberuflicher Informatiker

 
Glockenstraße 2a

D-66882 Hütschenhausen OT Spesbach

Mobil: +49 (0)176 82180714

Privat: +49 (0) 6371 954050

mailto:m...@simplydevelop.de <mailto:thomas.goetzin...@kajukin.de>
epost: thomas.goetzin...@epost.de <mailto:thomas.goetzin...@epost.de>




> On 29.10.2015, at 09:47, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Thomas,
> 
> until recently, Flink provided an own implementation of a S3FileSystem which 
> wasn't fully tested and buggy.
> We removed that implementation and are using now (in 0.10-SNAPSHOT) Hadoop's 
> S3 implementation by default.
> 
> If you want to continue using 0.9.1 you can configure Flink to use Hadoop's 
> implementation. See this answer on StackOverflow and the linked email thread 
> [1].
> If you switch to the 0.10-SNAPSHOT version (which will be released in a few 
> days as 0.10.0), things become a bit easier and Hadoop's implementation is 
> used by default. The documentation shows how to configure your access keys [2]
> 
> Please don't hesitate to ask if something is unclear or not working.
> 
> Best, Fabian
> 
> [1] 
> http://stackoverflow.com/questions/32959790/run-apache-flink-with-amazon-s3 
> <http://stackoverflow.com/questions/32959790/run-apache-flink-with-amazon-s3>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html>
> 
> 2015-10-29 9:35 GMT+01:00 Thomas Götzinger <m...@simplydevelop.de 
> <mailto:m...@simplydevelop.de>>:
> Hello Flink Team,
> 
> We at IESE Fraunhofer are evaluating Flink for a project and I'm a bit 
> frustrated in the moment. 
> 
> I've wrote a few testcases with the flink API and want to deploy them to an 
> Flink EC2 Cluster. I setup the cluster using the 
> karamel receipt which was adressed in the following video 
> 
> https://www.google.de/url?sa=t=j==s=video=1=rja=8=0CDIQtwIwAGoVChMIy86Tq6rQyAIVR70UCh0IRwuJ=http%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3Dm_SkhyMV0to=AFQjCNGKUzFv521yg-OTy-1XqS2-rbZKug=bv.105454873,d.bGg
>  
> <https://www.google.de/url?sa=t=j==s=video=1=rja=8=0CDIQtwIwAGoVChMIy86Tq6rQyAIVR70UCh0IRwuJ=http%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3Dm_SkhyMV0to=AFQjCNGKUzFv521yg-OTy-1XqS2-rbZKug=bv.105454873,d.bGg>
> 
> The setup works fine and the hello-flink app could be run. But afterwards I 
> want to copy some data from s3 bucket to the local ec2 hdfs cluster. 
> 
> The hadoop fs -ls s3n works as well as cat,...
> But if I want to copy the data with distcp the command freezes, and does not 
> respond until a timeout.
> 
> After trying a few things I gave up and start another solution. I want to 
> access the s3 Bucket directly with flink and import it using a small flink 
> programm which just reads s3 and writes to local hadoop. This works fine 
> locally, but on cluster the S3NFileSystem class is missing (ClassNotFound 
> Exception) althoug it is included in