Looking for a partner for an European H2020 project
We are looking for a company to help us in our SOIRE2-2016 proposal to present 20 January. We have industrial / end users partners, statistical and data Mining too but we need FLINK people! Please, contact me if you are interested in our proposal in order to send you more information Thank you in advance and best regards Pedro phari...@gmail.com
Re: Specify jobmanager port in HA mode
Hey Cory, I’ve opened a pull request for this [1]. It’s based on the current snapshot version. This is not a bug fix, but since we are only adding behaviour I think that it can make its way into 0.10.2 if it is a blocker for users. Currently, you can only try it out by building it yourself. Is this an option? If yes, [2] is a guide on how to do it. Just replace the clone command with git clone -b 3172-ha_jm_port https://github.com/uce/flink.git – Ufuk [1] https://github.com/apache/flink/pull/1458 [2] https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html > On 15 Dec 2015, at 00:28, Cory Montywrote: > > Ufuk, > > I'm a colleague of Brian. Unfortunately, we are not running YARN so I don't > think that PR applies to us. We're trying to run a standalone cluster. > > Cheers, > > Cory > > On Mon, Dec 14, 2015 at 5:23 PM, Ufuk Celebi wrote: > This has been recently added to the YARN client by Robert [1]: > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-firewalls > > Are you running YARN? > > – Ufuk > > [1] https://github.com/apache/flink/pull/1416 > > > On 15 Dec 2015, at 00:03, Ufuk Celebi wrote: > > > > Hey Brian, > > > > I think that it is currently not possible. I will look into whether there > > is a workaround. In any case, this sounds like a useful thing and it > > shouldn’t be too complicated to add the desired behaviour. > > > > I’ve opened an issue [1] for it and will look into it tomorrow. > > > > Is this currently blocking you from using Flink? > > > > – Ufuk > > > > [1] https://issues.apache.org/jira/browse/FLINK-3172 > > > >> On 14 Dec 2015, at 20:33, Brian Chhun wrote: > >> > >> Hello, > >> > >> Is it possible to set the job manager rpc port when running in HA mode? Or > >> is there a workaround or solution if we're running task managers with a > >> firewall? > >> > >> Thanks, > >> Brian > > > >
Apache Flink Web Dashboard - Completed Job history
Hi If I restart the Flink I don’t see anymore the history of the completed jobs. Is this a missing feature or what should I do to see the completed job list history? Best regards, Ovidiu
Re: Using S3 as state backend
Hi Brian, thanks, that helped me a lot. 2015-12-15 16:52 GMT+01:00 Brian Chhun: > Sure, excuse me if anything was obvious or wrong, I know next to nothing > about Hadoop. > > 1. get the Hadoop 2.7 distribution (I set its path to HADOOP_HOME to make > things easier for mysellf) > 2. set the HADOOP_CLASSPATH to include > ${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/tools/lib/* > (you may not need all those paths?) > 3. stick this into $HADOOP_HOME/etc/hadoop/core-site.xml > > > > fs.defaultFS > s3a://YOUR-BUCKET > > > fs.s3a.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > > > > 4. stick this into your flink-conf > > fs.hdfs.hadoopconf: $HADOOP_HOME/etc/hadoop > recovery.mode: zookeeper > recovery.zookeeper.quorum: whatever01.local:2181 > recovery.zookeeper.path.root: /whatever > state.backend: filesystem > state.backend.fs.checkpointdir: s3a:///YOUR-BUCKET/checkpoints > recovery.zookeeper.storageDir: s3a:///YOUR-BUCKET/recovery > > That's all I had to do in the Flink side. obvs in the AWS side, I had my > IAM role setup with readlwrite access to the bucket. > > Thanks, > Brian > > On Mon, Dec 14, 2015 at 10:39 PM, Thomas Götzinger > wrote: > >> Hi Brian >> >> Can you give me short summary how to achieve this. >> Am 14.12.2015 23:20 schrieb "Brian Chhun" : >> >>> For anyone else looking, I was able to use the s3a filesystem which can >>> use IAM role based authentication as provided by the underlying AWS client >>> library. >>> >>> Thanks, >>> Brian >>> >>> On Thu, Dec 10, 2015 at 4:28 PM, Brian Chhun < >>> brian.ch...@getbraintree.com> wrote: >>> Thanks Ufuk, this did the trick. Thanks, Brian On Wed, Dec 9, 2015 at 4:37 PM, Ufuk Celebi wrote: > Hey Brian, > > did you follow the S3 setup guide? > https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html > > You have to set the fs.hdfs.hadoopconf property and add > > > fs.s3.impl > org.apache.hadoop.fs.s3native.NativeS3FileSystem > > > to core-site.xml > > – Ufuk > > > On 09 Dec 2015, at 20:50, Brian Chhun > wrote: > > > > Hello, > > > > I'm trying to setup an HA cluster and I'm running into issues using > S3 as the state backend. This is raised during startup: > > > > 2015-12-09T19:23:36.430724+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: java.io.IOException: No file system > found with scheme s3, referenced in file URI 's3:///flink/recovery/blob'. > > > > 2015-12-09T19:23:36.430858+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.core.fs.FileSystem.get(FileSystem.java:242) > > > > 2015-12-09T19:23:36.430989+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.blob.FileSystemBlobStore.(FileSystemBlobStore.java:67) > > > > 2015-12-09T19:23:36.431297+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:105) > > > > 2015-12-09T19:23:36.431435+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1814) > > > > 2015-12-09T19:23:36.431569+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1944) > > > > 2015-12-09T19:23:36.431690+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1898) > > > > 2015-12-09T19:23:36.431810+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:1584) > > > > 2015-12-09T19:23:36.431933+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1486) > > > > 2015-12-09T19:23:36.432414+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1447) > > > > 2015-12-09T19:23:36.432649+00:00 i-1ec317c4 > docker/jobmanager01-d3174d6[1207]: #011at > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala) > > > > Is it possible to use S3 as the backend store or is only hdfs/mapfs > supported? > > > > > > Thanks, > > Brian > > >>> > -- Viele Grüße Thomas Götzinger Freiberuflicher Informatiker Glockenstraße 2a D-66882 Hütschenhausen OT Spesbach Mobil: +49 (0)176 82180714
All datanodes are bad
Hi, I am receiving the following exception while trying to run the terasort program on flink. My configuration is as follows: Hadoop: 2.6.2 Flink: 0.10.1 Server 1: Hadoop data and name node Flink job and task manager Server 2: Flink task manager org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:370) at org.apache.flink.client.program.Client.runBlocking(Client.java:348) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:627) at eastcircle.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:88) at eastcircle.terasort.FlinkTeraSort.main(FlinkTeraSort.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: All datanodes xxx.xxx.xx.xx:50010 are bad. Aborting... at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1206) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1004) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:548)
Re: Apache Flink Web Dashboard - Completed Job history
I guess it should be possible to manually save this information from the corresponding directory and copy it back after restart? But I am not sure? Please correct me if I am wrong. -Matthias On 12/16/2015 03:16 PM, Ufuk Celebi wrote: > >> On 16 Dec 2015, at 15:00, Ovidiu-Cristian MARCU >>wrote: >> >> Hi >> >> If I restart the Flink I don’t see anymore the history of the completed jobs. >> Is this a missing feature or what should I do to see the completed job list >> history? > > Not possible at the moment. > > Completed jobs are archived on the JobManager and lost after restarts. > > – Ufuk > signature.asc Description: OpenPGP digital signature
Re: Apache Flink Web Dashboard - Completed Job history
> On 16 Dec 2015, at 15:00, Ovidiu-Cristian MARCU >wrote: > > Hi > > If I restart the Flink I don’t see anymore the history of the completed jobs. > Is this a missing feature or what should I do to see the completed job list > history? Not possible at the moment. Completed jobs are archived on the JobManager and lost after restarts. – Ufuk
Re: global watermark across multiple kafka consumers
Hi Andrew, as far as I know, there is nothing such as a prescribed way of handling this kind of situation. If you want to synchronize the watermark generation given a set of KafkaConsumers you need some kind of ground truth. This could be, for example, a central registry such as ZooKeeper in which you collect the current watermarks of the different consumers. You could access ZooKeeper from inside the TimestampExtractor. Alternatively, however a bit more hacky, you could exploit that the consumer tasks are usually colocated with consumer tasks from different topics. This means that you'll have multiple subtasks reading from the different Kafka topics running in the same JVM. You could then use class variables to synchronize the watermarks. But this assumes that each subtask reading the topic t from Kafka is colocated with at least one other subtask reading the topic t' from Kafka with t' in T \ {t} and T being the set of Kafka topics. Per default this should be the case. I'm wondering why you need a global watermark for you Kafka topics. Isn't it enough that you have individual watermarks for each topic? Cheers, Till On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrewwrote: > Hi guys, > > I have a question related to utilizing watermarks with multiple > FlinkKakfkaConsumer082 instances. The aim is to have a global watermark > across multiple kafka consumers where any message from any kafka partition > would update the same watermark. When testing a simple TimeStampExtractor > implementation it seems each consumer results in a separate watermark. Is > there a prescribed way of handling such a thing that anyone has any > experience with? > > Thanks for your help, > > Andrew Griess > >
Re: Tiny topology shows '0' for all stats.
@Nick: That is what I read in the current description and comments. On Tue, Dec 15, 2015 at 6:25 PM, Nick Dimidukwrote: > For my own understanding, are you suggesting the FLINK-2944 (or a subtask) > is the appropriate place to implement exposure of metrics such as bytes, > records in, out of Streaming sources and sinks? > > On Tue, Dec 15, 2015 at 5:24 AM, Niels Basjes wrote: > >> Hi, >> >> @Ufuk: I added the env.disableOperatorChaining() and indeed now I see two >> things on the screen and there are numbers counting what has happened. >> @Stephan: Yes, I understand these numbers now. >> >> I found that this is already a jira ticket to add what I was looking for: >> https://issues.apache.org/jira/browse/FLINK-2944 >> https://issues.apache.org/jira/browse/FLINK-3130 >> >> Niels >> >> >> >> On Mon, Dec 14, 2015 at 5:03 PM, Ufuk Celebi wrote: >> >>> >>> > On 14 Dec 2015, at 16:25, Niels Basjes wrote: >>> > >>> > Hi, >>> > >>> > I have a very small topology here. >>> > In fact this is a thing that generates synthetic data and puts it into >>> Kafka. >>> > When looking at the web UI I see that all counters (i.e. Bytes >>> received, Records received, Bytes sent, Records sent) all remain 0. >>> > I verified and I'm seeing thousands of records arriving into Kafka. >>> > >>> > Is this a bug in Flink or am I misinterpreting the meaning of these >>> numbers? >>> >>> Sources and sinks do not show the number of received or sent records, >>> because of the internals of how these numbers are collected. I agree that >>> this is confusing. Big +1 to improve this. >>> >>> You actually don’t see any numbers, because the operators are chained >>> and hence you only have one task, which acts as both source and sink. >>> >>> You should see some sent and received numbers if you break up the chain >>> (env.disableOperatorChaining()). Can you confirm this? >>> >>> – Ufuk >>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > -- Best regards / Met vriendelijke groeten, Niels Basjes