Re: Why Scala Option is not a valid key?

2016-04-04 Thread Chiwan Park
I just found that Timur created a JIRA issue for this (FLINK-3698).

Regards,
Chiwan Park

> On Mar 31, 2016, at 7:27 PM, Till Rohrmann  wrote:
> 
> Actually I think that it’s not correct that the OptionType cannot be used as 
> a key type. In fact it is similar to a composite type and should be usable as 
> a key iff it’s element can be used as a key. Then we only have to provide an 
> OptionTypeComparator which will compare the elements if they are set. If not, 
> then the None element will be smaller, for example.
> 
> @Timur, if you want, then you can file a JIRA issue to add that.
> 
> Cheers,
> Till
> 
> 
> On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov  
> wrote:
> Ok, I can't make Option comparable, so the only way that I see is to 
> translate a key to a Comparable data structure and use it (as it was alluded 
> to in your example above). Thank you for clarification!
> 
> Thanks,
> Timur
> 
> On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park  wrote:
> Hi Timur,
> 
> Sorry for confusing. I meant KeySelector.
> 
> `GenericType` could be used as a key type if the `T` implements 
> `Comparable`. For example, `GenericType` could be used as a key type 
> but `GenericType` could not.
> 
> About my example in previous mail, the type of key is `Int` because the 
> return type of KeySelector is `Int`. `TypeInformation` is not generic 
> type.
> 
> Regards,
> Chiwan Park
> 
> > On Mar 31, 2016, at 1:09 AM, Timur Fayruzov  
> > wrote:
> >
> > Thank you for your answers, Chiwan! That would mean that a generic type 
> > can't be used as a key in general? This is a non-obvious limitation of 
> > Flink DSL that I didn't see in documentation.
> >
> > Could you please elaborate what you mean by KeyExtractor? I see that inside 
> > `where` operator an instance of KeySelector is initialized, but I don't see 
> > how can I pass a custom KeySelector in.
> >
> > Thanks,
> > Timur
> >
> > On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park  wrote:
> > Hi Timur,
> >
> > Because Option[T] is not comparable type generally (if T is a POJO type), 
> > you cannot use Option[T] as a key type. I think you have to implement 
> > KeyExtractor to compare objects including Option[T]s.
> >
> > ```
> > case class MyKey(k1: Option[String], k2: Option[String])
> >
> > val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), 
> > Some("c")))
> > val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), 
> > Some("c")))
> >
> > data1.join(data2)
> >   .where(_.hashCode())
> >   .equalTo(_.hashCode()).apply {
> > (left: MyKey, right: MyKey) => (left, right)
> >   }.print()
> > ```
> >
> > Note that the approach in example (using hashCode()) cannot be applied to 
> > sort task.
> >
> > Regards,
> > Chiwan Park
> >
> > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov  
> > > wrote:
> > >
> > > There is some more detail to this question that I missed initially. It 
> > > turns out that my key is a case class of a form MyKey(k1: Option[String], 
> > > k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive 
> > > check whether every element of the MyKey class can be a key and fails 
> > > when encountering an Option.
> > >
> > > Is it possible to work around this situation without giving up Options? 
> > > Inability to use Options in Domain objects could be really frustrating.
> > >
> > > Thanks,
> > > Timur
> > >
> > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov 
> > >  wrote:
> > > Hello,
> > >
> > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used 
> > > as a key for coGroup (looking specifically here: 
> > > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
> > >  I'm not clear about the reason of this and appreciate if someone can 
> > > explain.
> > >
> > > Thanks,
> > > Timur
> > >
> >
> >
> 
> 
> 



Re: building for Scala 2.11

2016-04-04 Thread Balaji Rajagopalan
In your pom file you can mention against which version of scala you want to
build and also remember to add the scala version in the artifactId in all
the dependencies which takes scala version, there might be some libraries
which are scala agnostic there you do not have to specify the scala
version.




UTF-8

1.7.12

1.0.0

2.11









org.apache.flink


flink-streaming-java_${scala.version}

${flink.version}






org.apache.flink

flink-streaming-scala_${scala.version}


${flink.version}



   

On Tue, Apr 5, 2016 at 6:10 AM, Andrew Gaydenko 
wrote:

> Hi!
>
> How to build the project for Scala 2.11?
> --
>
> Regards,
> Andrew
>


Re: building for Scala 2.11

2016-04-04 Thread Chiwan Park
Hi Andrew,

The method to build Flink with Scala 2.11 is described in Flink documentation 
[1].

I think this is not relevant but just FYI, to build your application based on 
Flink 1.0 (or later) with Scala 2.11, you should be careful to set Flink 
dependencies. There is a guide in Wiki [2].

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#scala-versions
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Regards,
Chiwan Park

> On Apr 5, 2016, at 9:40 AM, Andrew Gaydenko  wrote:
> 
> Hi!
> 
> How to build the project for Scala 2.11?
> -- 
> 
> Regards,
> Andrew



Re: FYI: Added Documentation on Basic Concepts

2016-04-04 Thread Balaji Rajagopalan
Nice write up, one question though my understanding of keyed stream is that
it will fork n number of streams from one stream based on n keys, if that
is true it can be pictorially depicted as well and the apply function will
can be shown to operate over the time period by clearly marking a time
line.

On Tue, Apr 5, 2016 at 12:28 AM, Henry Saputra 
wrote:

> This is great.
>
> This will open up the concepts in Flink that kind of "magic" from external
> users.
>
> - Henry
>
> On Mon, Apr 4, 2016 at 2:39 AM, Ufuk Celebi  wrote:
>
>> Dear Flink community,
>>
>> I'm happy to announce that we have added a long overdue section on
>> general Flink concepts to the documentation.
>>
>> You can find it here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html
>>
>> Thanks to Stephan Ewen who wrote this great overview. I hope that it
>> will help new and existing users to get a better overview of the basic
>> Flink concepts.
>>
>> – Ufuk
>>
>
>


building for Scala 2.11

2016-04-04 Thread Andrew Gaydenko
Hi!

How to build the project for Scala 2.11?
-- 

Regards,
Andrew


Re: Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
Thanks for the answer, Ken.

My understanding is that file system selection is driven by the following
sections in core-site.xml:

  fs.s3.impl
   
  org.apache.hadoop.fs.s3native.NativeS3FileSystem



  fs.s3n.impl
  com.amazon.ws.emr.hadoop.fs.EmrFileSystem


If I run the program using configuration above with s3n (and also modifying
credential keys to use s3n) it fails with the same error, but there is no
"... opening key ..." logs. S3a seems to be not supported, it fails with
the following:
Caused by: java.io.IOException: No file system found with scheme s3a,
referenced in file URI 's3a://'.
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
... 23 more

I am puzzled by the fact that EMRFS is still apparently referenced
somewhere as an implementation for S3 protocol, I'm not able to locate
where this configuration is set.


On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler 
wrote:

> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>
> Though EMR has some support for magically treating the s3 protocol as s3n
> (or maybe s3a now, with Hadoop 2.6 or later)
>
> What happens if you use s3n:/// for the --input
> parameter?
>
> — Ken
>
> On Apr 4, 2016, at 2:51pm, Timur Fayruzov 
> wrote:
>
> Hello,
>
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
> Job, unload outputs from HDFS to S3.
>
> However, ideally I'd like to read/write data directly from/to S3. I
> followed the instructions here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
> more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not
> using IAM, I had to provide AWS keys explicitly).
>
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> s3:// --output hdfs:///flink-output
>
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO
> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key
> '' for reading at position '333000'
>
> Then, it fails with the following error:
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> (WordCount Example)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> 
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.fl

Re: Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Ken Krugler
Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.

Though EMR has some support for magically treating the s3 protocol as s3n (or 
maybe s3a now, with Hadoop 2.6 or later)

What happens if you use s3n:/// for the --input 
parameter?

— Ken

> On Apr 4, 2016, at 2:51pm, Timur Fayruzov  wrote:
> 
> Hello,
> 
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded 
> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink 
> Job, unload outputs from HDFS to S3.
> 
> However, ideally I'd like to read/write data directly from/to S3. I followed 
> the instructions here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
>  
> ,
>  more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not using 
> IAM, I had to provide AWS keys explicitly).
> 
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster 
> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input 
> s3:// --output hdfs:///flink-output  
> 
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO  
> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key 
> '' for reading at position '333000'
>  
> Then, it fails with the following error:
> 
> 
>  The program finished with the following exception:
> 
> 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 
> (WordCount Example)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> 
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> 
> at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> at java.lang.reflect.Method.invoke(Method.java:606)
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> 
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> 
> at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> 
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> 
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> 
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> 
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.org 
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
> 
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
> 
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> 
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
> 
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTa

Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
Hello,

I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
Job, unload outputs from HDFS to S3.

However, ideally I'd like to read/write data directly from/to S3. I
followed the instructions here:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
more specifically I:
  1. Modified flink-conf to point to /etc/hadoop/conf
  2. Modified core-site.xml per link above (not clear why why it is not
using IAM, I had to provide AWS keys explicitly).

Run the following command:
HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
-yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
s3:// --output hdfs:///flink-output

First, I see messages like that:
2016-04-04 21:37:10,418 INFO
org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key
'' for reading at position '333000'

Then, it fails with the following error:



 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
(WordCount Example)

at org.apache.flink.client.program.Client.runBlocking(Client.java:381)

at org.apache.flink.client.program.Client.runBlocking(Client.java:355)

at org.apache.flink.client.program.Client.runBlocking(Client.java:315)

at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)

at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)

at org.apache.flink.client.program.Client.runBlocking(Client.java:248)

at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)

at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

at akka.dispatch.Mailbox.run(Mailbox.scala:221)

at akka.dispatch.Mailbox.exec(Mailbox.scala:231)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error:
org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:172)

at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)

... 21 more

Caused by: java.lang.NoSuchMethodError:
o

Re: YARN High Availability

2016-04-04 Thread Stephan Ewen
This seems to the the critical part in the logs:

2016-03-31 09:01:52,234 INFO  org.apache.flink.yarn.YarnJobManager
 - Re-submitting 0 job graphs.
2016-03-31 09:02:51,182 INFO  org.apache.flink.yarn.YarnJobManager
 - Stopping YARN JobManager with status FAILED and
diagnostic Flink YARN Client requested shutdown.

The YarnJobManager starts up properly, but the Client never sends anything,
shuts down at some point, and tears down the YARN cluster.

Client logs would help a lot there...




On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi  wrote:

> Hey Konstantin,
>
> That's weird. Can you please log the client output on DEBUG level and
> provide that as well? I'm wondering whether the client uses a
> different root path.
>
> The following seems to happen:
> - you use ledf_recovery as the root namespace
> - the task managers are connecting (they resolve the JM address via
> ZooKeeper in this case as well, which means they correctly use the
> same namespace)
> - but the client, which started the YARN session, does not ever submit
> the job to the cluster.
>
> – Ufuk
>
> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
>  wrote:
> > Hi everyone,
> >
> > we are running in some problems with multiple per-job yarn sessions, too.
> >
> > When we are are starting a per-job yarn session (Flink 1.0, Hadoop 2.4)
> > with recovery.zookeeper.path.root other than /flink, the yarn session
> > starts but no job is submitted, and after 1 min or so the session
> > crashes. I attached the jobmanager log.
> >
> > In Zookeeper the root-directory is created and child-nodes
> >
> > leaderlatch
> > jobgraphs
> >
> > /flink does also exist, but does not have child nodes.
> >
> > Everything runs fine, with the default recovery.zookeeper.root.path.
> >
> > Does anyone have an idea, what is going on?
> >
> > Cheers,
> >
> > Konstnatin
> >
> >
> > On 23.11.2015 17:00, Gwenhael Pasquiers wrote:
> >> We are not yet using HA in our cluster instances.
> >>
> >> But yes, we will have to change the zookeeper.path.root J
> >>
> >>
> >>
> >> We package our jobs with their own config folder (we don’t rely on
> >> flink’s config folder); we can put the maven project name into this
> >> property then they will have different values J
> >>
> >>
> >>
> >>
> >>
> >> *From:*Till Rohrmann [mailto:trohrm...@apache.org]
> >> *Sent:* lundi 23 novembre 2015 14:51
> >> *To:* user@flink.apache.org
> >> *Subject:* Re: YARN High Availability
> >>
> >>
> >>
> >> The problem is the execution graph handle which is stored in ZooKeeper.
> >> You can manually remove it via the ZooKeeper shell by simply deleting
> >> everything below your `recovery.zookeeper.path.root` ZNode. But you
> >> should be sure that the cluster has been stopped before.
> >>
> >>
> >>
> >> Do you start the different clusters with different
> >> `recovery.zookeeper.path.root` values? If not, then you should run into
> >> troubles when running multiple clusters at the same time. The reason is
> >> that then all clusters will think that they belong together.
> >>
> >>
> >>
> >> Cheers,
> >>
> >> Till
> >>
> >>
> >>
> >> On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers
> >>  >> > wrote:
> >>
> >> OK, I understand.
> >>
> >> Maybe we are not really using flink as you intended. The way we are
> >> using it, one cluster equals one job. That way we are sure to isolate
> >> the different jobs as much as possible and in case of crashes / bugs /
> >> (etc) can completely kill one cluster without interfering with the other
> >> jobs.
> >>
> >> That future behavior seems good :-)
> >>
> >> Instead of the manual flink commands, is there to manually delete those
> >> old jobs before launching my job ? They probably are somewhere in hdfs,
> >> aren't they ?
> >>
> >> B.R.
> >>
> >>
> >>
> >> -Original Message-
> >> From: Ufuk Celebi [mailto:u...@apache.org ]
> >> Sent: lundi 23 novembre 2015 12:12
> >> To: user@flink.apache.org 
> >> Subject: Re: YARN High Availability
> >>
> >> Hey Gwenhaël,
> >>
> >> the restarting jobs are most likely old job submissions. They are not
> >> cleaned up when you shut down the cluster, but only when they finish
> >> (either regular finish or after cancelling).
> >>
> >> The workaround is to use the command line frontend:
> >>
> >> bin/flink cancel JOBID
> >>
> >> for each RESTARTING job. Sorry about the inconvenience!
> >>
> >> We are in an active discussion about addressing this. The future
> >> behaviour will be that the startup or shutdown of a cluster cleans up
> >> everything and an option to skip this step.
> >>
> >> The reasoning for the initial solution (not removing anything) was to
> >> make sure that no jobs are deleted by accident. But it looks like this
> >> is more confusing than helpful.
> >>
> >> – Ufuk
> >>
> >>> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers
> >>  >> > wrote:
> >>>
> >>>

Re: FYI: Added Documentation on Basic Concepts

2016-04-04 Thread Henry Saputra
This is great.

This will open up the concepts in Flink that kind of "magic" from external
users.

- Henry

On Mon, Apr 4, 2016 at 2:39 AM, Ufuk Celebi  wrote:

> Dear Flink community,
>
> I'm happy to announce that we have added a long overdue section on
> general Flink concepts to the documentation.
>
> You can find it here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html
>
> Thanks to Stephan Ewen who wrote this great overview. I hope that it
> will help new and existing users to get a better overview of the basic
> Flink concepts.
>
> – Ufuk
>


Re: Using GeoIP2 in Flink Streaming

2016-04-04 Thread Stephan Ewen
Your code has to send the variable "DatabaseReader reader" into the cluster
together with the map function.
The class is not serializable, which means you cannot ship it like that.

If you control the code of the DatabaseReader , try to make the class
serializable.

If you cannot change the code of the DatabaseReader DatabaseReader, you can
try to do the following:

  - (1) copy the file into a distributed filesystem
  - (2) use a RichMapFunction, and in open(), you load the database from
the distributed file system stream.


On Mon, Apr 4, 2016 at 4:52 PM, Zhun Shen  wrote:

> Hi there,
>
> In my case, I want to use GeoIP2 in Flink Streaming, I know I need
> to serialize geoip2 related classes using Kryo. But I did know how to do it.
>
> Flink version: 1.0.0
> Kafka version: 0.9.0.0
> Deploy Mode: Local
>
> My demo code as below:
>
> File database = new File(“/home/user/GeoIP2-City.mmdb");
> final DatabaseReader reader = new
> DatabaseReader.Builder(database).build();
> DataStream messageStream = env
> .addSource(new FlinkKafkaConsumer09("test", new
> SimpleStringSchema(), properties));
>
> messageStream
> .rebalance()
> .map(new MapFunction() {
> public String map(String value) throws Exception {
>
> InetAddress ipAddress =
> InetAddress.getByName(value);
> CityResponse response = reader.city(ipAddress);
> Country country = response.getCountry();
> return "Kafka and Flink says: " + value + " " +
> country;
> }
> }).print();
>
> env.execute();
>
>
> I got the error below:
>
> Object FlinkTest$1@7c7d3c46 not serializable
>
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
>
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
>
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
> FlinkTest.main(FlinkTest.java:36)
>
> Any one can help me ?
>


Using GeoIP2 in Flink Streaming

2016-04-04 Thread Zhun Shen
Hi there,

In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize 
geoip2 related classes using Kryo. But I did know how to do it.

Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local

My demo code as below:

File database = new File(“/home/user/GeoIP2-City.mmdb");
final DatabaseReader reader = new 
DatabaseReader.Builder(database).build();
DataStream messageStream = env
.addSource(new FlinkKafkaConsumer09("test", new 
SimpleStringSchema(), properties));

messageStream
.rebalance()
.map(new MapFunction() {
public String map(String value) throws Exception {

InetAddress ipAddress = InetAddress.getByName(value);
CityResponse response = reader.city(ipAddress);
Country country = response.getCountry();
return "Kafka and Flink says: " + value + " " + country;
}
}).print();

env.execute(); 


I got the error below:

Object FlinkTest$1@7c7d3c46 not serializable

org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)

org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)

org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
FlinkTest.main(FlinkTest.java:36)

Any one can help me ?

Re: Kafka Test Error

2016-04-04 Thread Zhun Shen
I created a new project, and only add kaka-client, Flink-kafka-connect and 
Flink streaming libs, it works.

Thanks.


> On Apr 2, 2016, at 12:54 AM, Stephan Ewen  wrote:
> 
> The issue may be that you include Kafka twice:
> 
> 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0"
> 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which 
> internally adds "org.apache.kafka:kafka-clients:0.9.0.1"
> 
> These two Kafka versions may conflict. I would drop the dependency (1) and 
> simply let the FlinkKafkaConsumer pull whatever dependency it needs by itself.
> The 0.9.0.1 client the Flink internally uses should read fine from Kafka 
> 0.9.0.0 brokers.
> 
> Greetings,
> Stephan
> 
> 
> On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen  > wrote:
> Yeah, I mean I read the demo with 
> FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/ 
> ) then I wrote the 
> program based on Kafka 0.9.0.0 and Flink 1.0.0.
> 
>> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan 
>> mailto:balaji.rajagopa...@olacabs.com>> 
>> wrote:
>> 
>> Did you make sure the flinkconnector version and flink version is the same ? 
>> Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>> 
>> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen > > wrote:
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>> 
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("zookeeper.connect", "localhost:2181");
>> properties.setProperty("group.id ", "test");
>> properties.setProperty("key.deserializer", 
>> "org.apache.kafka.common.serialization.StringDeserializer");
>> properties.setProperty("value.deserializer", 
>> "org.apache.kafka.common.serialization.StringDeserializer");
>> properties.setProperty("partition.assignment.strategy", "range");
>> 
>> DataStream messageStream = env
>> .addSource(new FlinkKafkaConsumer09("nginx-logs", 
>> new SimpleStringSchema(), properties));
>> 
>> messageStream
>> .rebalance()
>> .map(new MapFunction() {
>> 
>> @Override
>> public String map(String value) throws Exception {
>> return "Kafka and Flink says: " + value;
>> }
>> }).print();
>> 
>> env.execute();
>> }
>> 
>> 
>> Always got the error below:
>> 
>> java.lang.NoSuchMethodError: 
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:194)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:164)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:131)
>> 
>> 
>> 
>> 
>>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar >> > wrote:
>>> 
>>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use 
>>> following dependency. 
>>> 
>>> 
>>> org.apache.flink
>>> flink-connector-kafka-0.9_2.10
>>>  1.0.0
>>> provided
>>>   
>>> 
>>> Thanks
>>> Ashutosh
>>> 
>>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen >> > wrote:
>>> Hi there,
>>> 
>>> I check my build.gradle file, I use 
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that 
>>> this lib is based on kaka-clients 0.9.0.1.
>>> 
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but 
>>> I’m confused by Flink’s libs with different versions. Which 
>>> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>> 
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>> 
>>> Any advice ? 
>>> 
>>> Thanks.
>>> 
>>> 
 On Mar 30, 2016, at 10:35 PM, Stephan Ewen >>> > wrote:
 
 Hi!
 
 A "NoSuchMethodError" usually means that you compile and run against 
 different versions.
 
 Make sure the version you reference in the IDE and the version on the 
 cluster are the same.
 
 Greetings,
 Stephan
 
 
 
 On Wed, Mar 30, 2016 at 9

Re: TopologyBuilder throws java.lang.ExceptionInInitializerError

2016-04-04 Thread Robert Metzger
Hi,

I suspect that this dependency:


storm
storm-kafka
0.9.0-wip16a-scala292



pulls in a different storm version.
Can you exclude the storm from that dependency?


You can also run: mvn clean install and then mvn dependency:tree to see
where the two conflicting storm dependencies are coming from.


On Thu, Mar 31, 2016 at 6:35 PM, Sharma, Samiksha 
wrote:

> Yes I tried cleaning the repo but that did not help either, please find
> attached pom.xml for your reference.
>
> Thanks
> Samiksha
>
>
> From: Till Rohrmann 
> Reply-To: "user@flink.apache.org" 
> Date: Wednesday, March 23, 2016 at 2:00 AM
> To: "user@flink.apache.org" 
> Subject: Re: TopologyBuilder throws java.lang.ExceptionInInitializerError
>
> Hi,
>
> have you tried clearing your m2 repository? It would also be helpful to
> see your dependencies (pom.xml).
>
> Cheers,
> Till
>
> On Tue, Mar 22, 2016 at 10:41 PM, Sharma, Samiksha <
> samiksha.sha...@here.com> wrote:
>
>> Hi,
>>
>> I am converting a storm topology to Flink-storm topology using the
>> flink-storm dependency. When I run my code the FlinkTopologyBuilder
>> eventually calls createTopology method in TopologyBuilder and throws the
>> error at the following highlighted line:-
>>
>> public StormTopology createTopology() {
>>
>> Map boltSpecs = new HashMap();
>>
>> Map spoutSpecs = new HashMap> SpoutSpec>();
>>
>> for(String boltId: _bolts.keySet()) {
>>
>> IRichBolt bolt = _bolts.get(boltId);
>>
>> ComponentCommon common = getComponentCommon(boltId, bolt);
>>
>> boltSpecs.put(boltId, new
>>  Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));
>> }
>>
>> Exception in thread "main" java.lang.ExceptionInInitializerError
>>
>> at
>> backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
>>
>> at
>> org.apache.flink.storm.api.FlinkTopologyBuilder.createTopology(FlinkTopologyBuilder.java:82)
>>
>> at
>> com.medio.services.avalanche.storm.common.BaseTopology.deploy(BaseTopology.java:118)
>>
>> at
>> com.medio.services.avalanche.realtime.eventcounter.EventCounterTopology.main(EventCounterTopology.java:18)
>>
>> Caused by: java.lang.RuntimeException: Found multiple defaults.yaml
>> resources. You're probably bundling the Storm jars with your topology jar.
>> [jar:file:/Users/samsharm/.m2/repository/storm/storm-core/
>> 0.9.0.1/storm-core-0.9.0.1.jar!/defaults.yaml,
>> jar:file:/Users/samsharm/.m2/repository/org/apache/storm/storm-core/0.9.4/storm-core-0.9.4.jar!/defaults.yaml]
>>
>> at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
>>
>> at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
>>
>> at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
>>
>> at backtype.storm.utils.Utils.(Utils.java:71)
>>
>> ... 4 more
>>
>>
>>
>> Looks like some bad storm dependencies are getting in the project. May I
>> request to please assist what might be going wrong?
>>
>>
>> I also emailed the dev community, and got the following response.
>> Following that did not help either
>>
>>
>> Hi,
>>
>> I am not sure if this is useful, I once had similar issue.
>> Are you including storm-core dependency in your project? flink-storm
>> dependency doesn't need any specific storm-core dependency.
>>
>> Just including flink-storm and removing storm-core dependency worked for
>> me.
>>
>> This has more information,
>>
>> https://flink.apache.org/news/2015/12/11/storm-compatibility.html
>>
>>
>> Thanks
>> Samiksha Sharma
>>
>>
>>
>>
>


Re: scaling a flink streaming application on a single node

2016-04-04 Thread Aljoscha Krettek
Hi,
I am not sure since people normally don't run Flink on such large machines.
They rather run it on many smaller machines.

I will definitely be interesting too see your new results where the Job can
actually use all the memory available on the machine.

--
aljoscha

On Mon, 4 Apr 2016 at 15:54 Shinhyung Yang  wrote:

> Dear Aljoscha and Ufuk,
>
> Thank you for clarifying! Yes I'm running this wordcount application
> on a 64-core machine with 120GB ram allocated for users.
>
> > In that case, the amount of RAM you give to the TaskManager seems to low.
> > Could you try re-running your experiments with:
> > jobmanager.heap.mb: 5000
> > taskmanager.heap.mb: 10
> >
> > So, about 100 GB of RAM for the TaskManager.
>
> Definitely I will try this! The result will be really interesting for
> sure. In this case, am I still good to go with 64 task slots with a
> single task manager?
>
> Thank you.
> With best regards,
> Shinhyung Yang.
>


Re: scaling a flink streaming application on a single node

2016-04-04 Thread Shinhyung Yang
Dear Aljoscha and Ufuk,

Thank you for clarifying! Yes I'm running this wordcount application
on a 64-core machine with 120GB ram allocated for users.

> In that case, the amount of RAM you give to the TaskManager seems to low.
> Could you try re-running your experiments with:
> jobmanager.heap.mb: 5000
> taskmanager.heap.mb: 10
>
> So, about 100 GB of RAM for the TaskManager.

Definitely I will try this! The result will be really interesting for
sure. In this case, am I still good to go with 64 task slots with a
single task manager?

Thank you.
With best regards,
Shinhyung Yang.


Re: CEP blog post

2016-04-04 Thread Till Rohrmann
Thanks a lot to all for the valuable feedback. I've incorporated your
suggestions and will publish the article, once Flink 1.0.1 has been
released (we need 1.0.1 to run the example code).

Cheers,
Till

On Mon, Apr 4, 2016 at 10:29 AM, gen tang  wrote:

> It is really a good article. Please put it on Flink Blog
>
> Cheers
> Gen
>
>
> On Fri, Apr 1, 2016 at 9:56 PM, Till Rohrmann 
> wrote:
>
>> Hi Flink community,
>>
>> I've written a short blog [1] post about Flink's new CEP library which
>> basically showcases its functionality using a monitoring example. I would
>> like to publish the post on the flink.apache.org blog next week, if
>> nobody objects. Feedback is highly appreciated :-)
>>
>> [1]
>> https://docs.google.com/document/d/1rF2zVjitdTcooIwzJKNCIvAOi85j-wDXf1goXWXHHbk/edit?usp=sharing
>>
>> Cheers,
>> Till
>>
>
>


Re: FYI: Updated Slides Section

2016-04-04 Thread Rubén Casado
Dear community, 

Just in case it is useful, please find below the links to the slides from the 
1st Flink Madrid Meetup talks given by Fabien Hueske [1] and myself [2] (in 
spanish). 

Best 

[1] http://www.slideshare.net/fhueske/data-stream-processing-with-apache-flink 
[2] 
http://es.slideshare.net/Datadopter/dnde-encaja-apache-flink-en-el-ecosistema-actual-de-tecnologas-big-data
 





















__ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
__ 

- Mensaje original - 
De: "Ufuk Celebi"  
Para: user@flink.apache.org 
CC: d...@flink.apache.org 
Enviados: Lunes, 4 de Abril 2016 11:33:59 GMT +01:00 Amsterdam / Berlín / Berna 
/ Roma / Estocolmo / Viena 
Asunto: FYI: Updated Slides Section 

Dear Flink community, 

I have updated the Material section on the Flink project page and 
moved the slides section to a separate page. 

You can find links to slides and talks here now: 
http://flink.apache.org/slides.html 

I've added slides for talks from this year by Till Rohrmann, Vasia 
Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think 
that something is missing, feel free to ping in this thread. 

– Ufuk 


Re: FYI: Updated Slides Section

2016-04-04 Thread Maximilian Michels
Hi Ufuk,

Thanks for updating the page. The "latest documentation" points to the
page itself and not the documentation. I've fixed that and added the
slides from Big Data Warsaw.

Cheers,
Max

On Mon, Apr 4, 2016 at 12:09 PM, Ufuk Celebi  wrote:
> @Paris: Just added it. Thanks for the pointer. Great slides!


Re: FYI: Updated Slides Section

2016-04-04 Thread Ufuk Celebi
@Paris: Just added it. Thanks for the pointer. Great slides!


Re: FYI: Updated Slides Section

2016-04-04 Thread Paris Carbone
Some people might find my slides on the FT fundamentals from last summer 
interesting. If you like it feel free to include it.

http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

Paris

On 04 Apr 2016, at 11:33, Ufuk Celebi mailto:u...@apache.org>> 
wrote:

Dear Flink community,

I have updated the Material section on the Flink project page and
moved the slides section to a separate page.

You can find links to slides and talks here now:
http://flink.apache.org/slides.html

I've added slides for talks from this year by Till Rohrmann, Vasia
Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
that something is missing, feel free to ping in this thread.

– Ufuk



FYI: Added Documentation on Basic Concepts

2016-04-04 Thread Ufuk Celebi
Dear Flink community,

I'm happy to announce that we have added a long overdue section on
general Flink concepts to the documentation.

You can find it here:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html

Thanks to Stephan Ewen who wrote this great overview. I hope that it
will help new and existing users to get a better overview of the basic
Flink concepts.

– Ufuk


FYI: Updated Slides Section

2016-04-04 Thread Ufuk Celebi
Dear Flink community,

I have updated the Material section on the Flink project page and
moved the slides section to a separate page.

You can find links to slides and talks here now:
http://flink.apache.org/slides.html

I've added slides for talks from this year by Till Rohrmann, Vasia
Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
that something is missing, feel free to ping in this thread.

– Ufuk


CEP API: Question on FollowedBy

2016-04-04 Thread Anwar Rizal
Hi All,


I saw Till's blog preparation. It will be a very helpful blog. I hope that
some other blogs that explain how it works will come soon :-)

I have a question on followedBy pattern matching semantic.


>From the documentation
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
,



Non-strict contiguity means that other events are allowed to occur
in-between two matching events. A non-strict contiguity pattern state can
be created via the followedBy method.

Pattern nonStrictNext = start.followedBy("middle");




I try to use Till's examples in the blog, to understand the semantic of
followedBy

--
First question.
Say, I have sequence of temperatures in a time window that corresponds to
the within clause (say in 20 minutes).

TemperatureEvent(40) , OtherEvent(...), TemperatureEvent(30),
TemperatureEvent(50), OtherEvent(...), TemperatureEvent(70),
TemperatureEvent(65), TemperatureEvent(60)

say I want to match two TemperatureEvents whose temperatures > 35.

What will be the matches in this case ?


   -  Will TemperatureEvent(40) , TemperatureEvent(50), match ? (because we
   have TemperatureEvent(30) at time 3 that does not match.
   - Will TemperatureEvent(40) , TemperatureEvent(70) match ? (because the
   pair matches also the specification of pattern , the difference is we have
   TemperatureEvent(50) which happened before TempertureEvent(70) ). Similar
   question for TemperatureEvent(40) - TemperatureEvent(65) and
   TemperatureEvent(50)-TemperatureEvent(65) etc. pairs.



--
Second question.
For next (and also for followedBy) , I have also questions regarding
example above:
Will TemperatureEvent(70), TemperatureEvent(65) and TemperatureEvent(65),
TemperatureEvent(60) be returned , or the second pair is no longer returned
because TemperatureEvent(65) has been used in the first pair ?



Is there a way to define the different sequence semantics  for the two
questions I asked above ?


Thanks,
Anwar.


Re: scaling a flink streaming application on a single node

2016-04-04 Thread Ufuk Celebi
Just to clarify: Shinhyung is running one a single node with 4 CPUs,
each having 16 cores.

On Mon, Apr 4, 2016 at 10:32 AM, Robert Metzger  wrote:
> Hi,
>
> usually it doesn't make sense to run multiple task managers on a single
> machine to get more slots.
> Your machine has only 4 CPU cores, so you are just putting a lot of pressure
> on the cpu scheduler..
>
> On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang 
> wrote:
>>
>> Thank you for replying!
>>
>> I am trying to do this on a single machine in fact. Since it has 64
>> cores, it would be interesting to look at the performance in that
>> regard.
>>
>> > How many machines are you using for this?
>> >
>> > The fact that you are giving 64 slots to each TaskManager means that a
>> > single TaskManager may end up executing all 64 pipelines. That would
>> > heavily
>> > overload that TaskManager and cause heavy degradation.
>>
>> Does it make sense if I run multiple TaskManagers on a single machine
>> if 64 slots are too many for a TaskManager?
>>
>> > If, for example, you use 16 machines, then give each machine 4 task
>> > slots
>> > (total of 64 slots across all machines)
>> > That way, the final run (parallelism 64) will be guaranteed to be spread
>> > across all machines.
>>
>> My intention for the experiment at the moment is to try to scale the
>> application up on a single machine to its maximum before moving on to
>> run the experiment on multiple machines.
>>
>> Thank you again!
>> With best regards,
>> Shinhyung Yang
>
>


Re: scaling a flink streaming application on a single node

2016-04-04 Thread Aljoscha Krettek
Hi,
I'm afraid no one read your email carefully. You indeed have one very big
machine with 64 physical CPU cores and 120 GB of RAM, correct?

In that case, the amount of RAM you give to the TaskManager seems to low.
Could you try re-running your experiments with:
jobmanager.heap.mb: 5000
taskmanager.heap.mb: 10

So, about 100 GB of RAM for the TaskManager.

Cheers,
Aljoscha

On Mon, 4 Apr 2016 at 10:32 Robert Metzger  wrote:

> Hi,
>
> usually it doesn't make sense to run multiple task managers on a single
> machine to get more slots.
> Your machine has only 4 CPU cores, so you are just putting a lot of
> pressure on the cpu scheduler..
>
> On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang 
> wrote:
>
>> Thank you for replying!
>>
>> I am trying to do this on a single machine in fact. Since it has 64
>> cores, it would be interesting to look at the performance in that
>> regard.
>>
>> > How many machines are you using for this?
>> >
>> > The fact that you are giving 64 slots to each TaskManager means that a
>> > single TaskManager may end up executing all 64 pipelines. That would
>> heavily
>> > overload that TaskManager and cause heavy degradation.
>>
>> Does it make sense if I run multiple TaskManagers on a single machine
>> if 64 slots are too many for a TaskManager?
>>
>> > If, for example, you use 16 machines, then give each machine 4 task
>> slots
>> > (total of 64 slots across all machines)
>> > That way, the final run (parallelism 64) will be guaranteed to be spread
>> > across all machines.
>>
>> My intention for the experiment at the moment is to try to scale the
>> application up on a single machine to its maximum before moving on to
>> run the experiment on multiple machines.
>>
>> Thank you again!
>> With best regards,
>> Shinhyung Yang
>>
>
>


Re: scaling a flink streaming application on a single node

2016-04-04 Thread Robert Metzger
Hi,

usually it doesn't make sense to run multiple task managers on a single
machine to get more slots.
Your machine has only 4 CPU cores, so you are just putting a lot of
pressure on the cpu scheduler..

On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang 
wrote:

> Thank you for replying!
>
> I am trying to do this on a single machine in fact. Since it has 64
> cores, it would be interesting to look at the performance in that
> regard.
>
> > How many machines are you using for this?
> >
> > The fact that you are giving 64 slots to each TaskManager means that a
> > single TaskManager may end up executing all 64 pipelines. That would
> heavily
> > overload that TaskManager and cause heavy degradation.
>
> Does it make sense if I run multiple TaskManagers on a single machine
> if 64 slots are too many for a TaskManager?
>
> > If, for example, you use 16 machines, then give each machine 4 task slots
> > (total of 64 slots across all machines)
> > That way, the final run (parallelism 64) will be guaranteed to be spread
> > across all machines.
>
> My intention for the experiment at the moment is to try to scale the
> application up on a single machine to its maximum before moving on to
> run the experiment on multiple machines.
>
> Thank you again!
> With best regards,
> Shinhyung Yang
>


Re: CEP blog post

2016-04-04 Thread gen tang
It is really a good article. Please put it on Flink Blog

Cheers
Gen


On Fri, Apr 1, 2016 at 9:56 PM, Till Rohrmann  wrote:

> Hi Flink community,
>
> I've written a short blog [1] post about Flink's new CEP library which
> basically showcases its functionality using a monitoring example. I would
> like to publish the post on the flink.apache.org blog next week, if
> nobody objects. Feedback is highly appreciated :-)
>
> [1]
> https://docs.google.com/document/d/1rF2zVjitdTcooIwzJKNCIvAOi85j-wDXf1goXWXHHbk/edit?usp=sharing
>
> Cheers,
> Till
>


Re: CEP blog post

2016-04-04 Thread Maximilian Michels
Made a few suggestions. Reads well, Till!

On Mon, Apr 4, 2016 at 10:10 AM, Ufuk Celebi  wrote:
> Same here.
>
> +1 to publish
>
> On Mon, Apr 4, 2016 at 10:04 AM, Aljoscha Krettek  wrote:
>> Hi,
>> I like it. Very dense and very focused on the example but I think it should
>> be good for the Flink Blog.
>>
>> --
>> aljoscha
>>
>> On Fri, 1 Apr 2016 at 15:56 Till Rohrmann  wrote:
>>
>>> Hi Flink community,
>>>
>>> I've written a short blog [1] post about Flink's new CEP library which
>>> basically showcases its functionality using a monitoring example. I would
>>> like to publish the post on the flink.apache.org blog next week, if
>>> nobody objects. Feedback is highly appreciated :-)
>>>
>>> [1]
>>> https://docs.google.com/document/d/1rF2zVjitdTcooIwzJKNCIvAOi85j-wDXf1goXWXHHbk/edit?usp=sharing
>>>
>>> Cheers,
>>> Till
>>>


Re: CEP blog post

2016-04-04 Thread Ufuk Celebi
Same here.

+1 to publish

On Mon, Apr 4, 2016 at 10:04 AM, Aljoscha Krettek  wrote:
> Hi,
> I like it. Very dense and very focused on the example but I think it should
> be good for the Flink Blog.
>
> --
> aljoscha
>
> On Fri, 1 Apr 2016 at 15:56 Till Rohrmann  wrote:
>
>> Hi Flink community,
>>
>> I've written a short blog [1] post about Flink's new CEP library which
>> basically showcases its functionality using a monitoring example. I would
>> like to publish the post on the flink.apache.org blog next week, if
>> nobody objects. Feedback is highly appreciated :-)
>>
>> [1]
>> https://docs.google.com/document/d/1rF2zVjitdTcooIwzJKNCIvAOi85j-wDXf1goXWXHHbk/edit?usp=sharing
>>
>> Cheers,
>> Till
>>


Re: CEP blog post

2016-04-04 Thread Aljoscha Krettek
Hi,
I like it. Very dense and very focused on the example but I think it should
be good for the Flink Blog.

--
aljoscha

On Fri, 1 Apr 2016 at 15:56 Till Rohrmann  wrote:

> Hi Flink community,
>
> I've written a short blog [1] post about Flink's new CEP library which
> basically showcases its functionality using a monitoring example. I would
> like to publish the post on the flink.apache.org blog next week, if
> nobody objects. Feedback is highly appreciated :-)
>
> [1]
> https://docs.google.com/document/d/1rF2zVjitdTcooIwzJKNCIvAOi85j-wDXf1goXWXHHbk/edit?usp=sharing
>
> Cheers,
> Till
>


varying results: local VS cluster

2016-04-04 Thread Lydia Ickler
Hi all,

I have an issue regarding execution on 1 machine VS 5 machines.
If I execute the following code the results are not the same though I would 
expect them to be since the input file is the same.
Do you have any suggestions?

Thanks in advance!
Lydia
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

//read input file
DataSet> matrixA = readMatrix(env, 
parameters.get("input"));
//Approximate EigenVector by PowerIteration
//get initial vector - which equals matrixA * [1, ... , 1]
DataSet> initial0 = 
(matrixA.groupBy(0)).sum(2);
DataSet> maximum = initial0.maxBy(2);
//normalize by maximum value
DataSet> initial= 
(initial0.cross(maximum)).map(new normalizeByMax());

//BulkIteration to find dominant eigenvector
IterativeDataSet> iteration = 
initial.iterate(1);

DataSet> intermediate = 
((matrixA.join(iteration).where(1).equalTo(0))
.map(new ProjectJoinResultMapper())).groupBy(0, 
1)).sum(2)).groupBy(0)).sum(2)).
cross(matrixA.join(iteration).where(1).equalTo(0))
.map(new ProjectJoinResultMapper())).groupBy(0, 
1)).sum(2))).groupBy(0)).sum(2)).sum(2)))
.map(new normalizeByMax());

DataSet> diffs = 
(iteration.join(intermediate).where(0).equalTo(0)).with(new deltaFilter());
DataSet> eigenVector  = 
iteration.closeWith(intermediate,diffs);

eigenVector.writeAsCsv(parameters.get("output"));
env.execute("Power Iteration");