Re: Nested Iterations supported in Flink?

2015-04-14 Thread Till Rohrmann
If your inner iterations happens to work only on the data of a single
partition, then you can also implement this iteration as part of a
mapPartition operator. The only problem there would be that you have to
keep all the partition's data on the heap, if you need access to it.

Cheers,

Till

On Tue, Apr 14, 2015 at 3:11 PM, Stephan Ewen se...@apache.org wrote:

 Hi Benoît!

 You are right, the nested iterations are currently not supported.

 The test you found actually checks that the Optimizer gives a good error
 message when encountering nested iterations.

 Can you write your program as one iterations (the inner) and start the
 program multiple times to simulate the nesting?

 Greetings,
 Stephan


 On Tue, Apr 14, 2015 at 8:11 AM, Benoît Hanotte 
 benoit.jc.hano...@campus.tu-berlin.de wrote:

 Hello,

 I'm implementing an algorithm which requires nested iterations, and, from
 what I understood, this feature was not yet available in Flink [1], and my
 experiments with 2 nested bulk iterations seem to confirm that. However I
 came across a Flink unit test [2] using nested iterations, so I'm now a bit
 confused. Could you please give me some insight on what is supported or not
 in the current state of the project?

 Thanks!

 Benoît.

 [1] http://mail-archives.apache.org/mod_mbox/flink-user/
 201411.mbox/%3CCANC1h_tpNFWwrnm14+Et1yBvkQPQ1-pfk=
 iupp5un90zgug...@mail.gmail.com%3E
 [2] https://github.com/apache/flink/blob/master/flink-
 optimizer/src/test/java/org/apache/flink/optimizer/
 NestedIterationsTest.java





Re: Left outer join

2015-04-17 Thread Till Rohrmann
No its not, but at the moment there is afaik no other way around it. There
is an issue for proper outer join support [1]

[1] https://issues.apache.org/jira/browse/FLINK-687

On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Could resolve the problem but the fact to accumulate stuff in a local
 variable is it safe if datasets are huge..?

 On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann till.rohrm...@gmail.com
 wrote:

 If it's fine when you have null string values in the cases where
 D1.f1!=a1 or D1.f2!=a2 then a possible solution could look like (with
 Scala API):

 val ds1: DataSet[(String, String, String)] = getDS1
 val ds2: DataSet[(String, String, String)] = getDS2

 ds1.coGroup(ds2).where(2).equalTo(0) {
   (left, right, collector: Collector[(String, String, String, String)])
 = {
 if(right.isEmpty) {
   left foreach {
   element = {
   val value1 = if(element._2 == a1) element._3 else null
   val value2 = if(element._2 == a2) element._3 else null
   collector.collect((element._1, null, value1, value2))
 }
   }
 } else {
   val array = right.toArray
   for(leftElement - left) {
   val value1 = if(leftElement._2 == a1) leftElement._3 else null
 val value2 = if(leftElement._2 == a2) leftElement._3 else null

 for(rightElement - array) {
   collector.collect(leftElement._1, rightElement._1, value1, value2))
 }
   }
 }
   }
 }

 Does this solve your problem?

 On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier pomperma...@okkam.it
  wrote:

 Hi Till,
 thanks for the reply.
 What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2
 (D1.f2==D2.f0).
 If this condition is true, I would like to produce a set of tuples with
 the matching elements
 at the first to places (D1.*f2*, D2.*f0*) and the other two values (if
 present) of the matching tuple
 in D1 when D1.f1==*a1* and D1.f2=*a2* (string values) respectively.
 (PS: For each value of D1.f0 you can have at most one value of a1 and a2)

 Is it more clear?

 On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann till.rohrm...@gmail.com
 wrote:

 Hi Flavio,

 I don't really understand what you try to do. What does
 D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if
 D1.f1==p2) is false?

 Where does the values a1 and a2 in (A, X, a1, a2) come from when you
 join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can
 elaborate a bit more on your example.

 Cheers,

 Till

 On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 I cannot find a solution to my use case :(
 I have 2 datasets D1 and D2 like:

 D1:
 A,p1,a1
 A,p2,a2
 A,p3,X
 B,p3,Y
 B,p1,b1

 D2:
 X,s,V
 X,r,2
 Y,j,k

 I'd like to have a unique dataset D3(Tuple4) like

 A,X,a1,a2
 B,Y,b1,null

 Basically filling with D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
 D1.f1==p2) when D1.f2==D2.f0.
 Is that possible and how?
 Could you show me a simple snippet?

 Thanks in advance,
 Flavio




 On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann trohrm...@apache.org
 wrote:

 You can materialize the input of the right input by creating an array
 out of it, for example. Then you can reiterate over it.

 Cheers,
 Till
 On Apr 16, 2015 7:37 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 Hi Maximilian,
 I tried your solution but it doesn't work because the rightElements
 iterator cannot be used more than once:

 Caused by: org.apache.flink.util.TraversableOnceException: The
 Iterable can be iterated over only once. Only the first call to
 'iterator()' will succeed.

 On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels m...@apache.org
  wrote:

 Hi Flavio,

 Here's an simple example of a Left Outer Join:
 https://gist.github.com/mxm/c2e9c459a9d82c18d789

 As Stephan pointed out, this can be very easily modified to
 construct a Right Outer Join (just exchange leftElements and 
 rightElements
 in the two loops).

 Here's an excerpt with the most important part, the coGroup
 function:

 public static class LeftOuterJoin implements 
 CoGroupFunctionTuple2Integer, String, Tuple2Integer, String, 
 Tuple2Integer, Integer {

@Override
public void coGroup(IterableTuple2Integer, String leftElements,
IterableTuple2Integer, String rightElements,
CollectorTuple2Integer, Integer out) throws 
 Exception {

   final int NULL_ELEMENT = -1;

   for (Tuple2Integer, String leftElem : leftElements) {
  boolean hadElements = false;
  for (Tuple2Integer, String rightElem : rightElements) {
 out.collect(new Tuple2Integer, Integer(leftElem.f0, 
 rightElem.f0));
 hadElements = true;
  }
  if (!hadElements) {
 out.collect(new Tuple2Integer, Integer(leftElem.f0, 
 NULL_ELEMENT));
  }
   }

}
 }



 On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen se...@apache.org
 wrote:

 I think this may be a great example to add

Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)

2015-04-07 Thread Till Rohrmann
I don't know whether my ideas are much better than the cartesian product
solution. As a matter of fact at some point we have to replicate the data
to be able to compute the correlations in parallel. There are basically 3
ideas I had:

1. Broadcast U and V and simply compute the correlation for different
shifts in a mapper. This only works if the time series data is small enough
to be kept in memory of a task manager.
2. Create for each shift and element a join key, join the elements and
reduce them to obtain the final result. This has a communication complexity
of (n^2+n)/2 which is asymptotically the same as the cartesian product
solution. But this solution will probably run for arbitrarily large
correlation intervals.

So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2), (2,
2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3), (2, 2,
v3), (3, 1, v3). Then join on the first and second field and compute u*v
with the first field as key. Reducing on this field let's you then compute
the correlation.

3. Group the elements of each subinterval with respect to their shift value
and join both grouped subintervals. Then compute the correlation. This
again only works if the grouped data can be kept on the heap of the task
manager.

On Tue, Apr 7, 2015 at 1:29 PM, Sebastian s...@apache.org wrote:

 How large are the individual time series?

 -s

 On 07.04.2015 12:42, Kostas Tzoumas wrote:

 Hi everyone,

 I'm forwarding a private conversation to the list with Mats' approval.

 The problem is how to compute correlation between time series in Flink.
 We have two time series, U and V, and need to compute 1000 correlation
 measures between the series, each measure shifts one series by one more
 item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.

 Any ideas on how one can do that without a Cartesian product?

 Best,
 Kostas

 -- Forwarded message --
 From: *Mats Zachrison* mats.zachri...@ericsson.com
 mailto:mats.zachri...@ericsson.com
 Date: Tue, Mar 31, 2015 at 9:21 AM
 Subject:
 To: Kostas Tzoumas kos...@data-artisans.com
 mailto:kos...@data-artisans.com, Stefan Avesand
 stefan.aves...@ericsson.com mailto:stefan.aves...@ericsson.com
 Cc: step...@data-artisans.com mailto:step...@data-artisans.com
 step...@data-artisans.com mailto:step...@data-artisans.com

 As Stefan said, what I’m trying to achieve is basically a nice way to do
 a correlation between two large time series. Since I’m looking for an
 optimal delay between the two series, I’d like to delay one of the
 series x observations when doing the correlation, and step x from 1 to
 1000.

 __ __

 Some pseudo code:

 __ __

For (x = 1 to 1000)

Shift Series A ‘x-1’ steps

Correlation[x] = Correlate(Series A and Series B)

End For

 __ __

 In R, using cor() and apply(), this could look like:

 __ __

shift - as.array(c(1:1000))

corrAB - apply(shift, 1, function(x) cor(data[x:nrow(data),
 ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))

 __ __

 __ __

 Since this basically is 1000 independent correlation calculations, it is
 fairly easy to parallelize. Here is an R example using foreach() and
 package doParallel:

 __ __

cl - makeCluster(3)

registerDoParallel(cl)

corrAB - foreach(step = c(1:1000)) %dopar% {

  corrAB - cor(data[step:nrow(data), ]$ColumnA,
 data[1:(nrow(data) - (step - 1)), ]$ColumnB)

}

stopCluster(cl)

 __ __

 So I guess the question is – how to do this in a Flink environment? Do
 we have to define how to parallelize the algorithm, or can the cluster
 take care of that for us?

 __ __

 And of course this is most interesting on a generic level – given the
 environment of a multi-core or –processor setup running Flink, how hard
 is it to take advantage of all the clock cycles? Do we have to split the
 algorithm, and data, and distribute the processing, or can the system do
 much of that for us?

 __


 __ __

 __





Re: Flink meetup group in Stockholm

2015-04-08 Thread Till Rohrmann
Really cool :-)

On Wed, Apr 8, 2015 at 5:09 PM, Maximilian Michels m...@apache.org wrote:

 Love the purple. Have fun! :)

 On Wed, Apr 8, 2015 at 5:05 PM, Henry Saputra henry.sapu...@gmail.com
 wrote:

 Nice, congrats!

 On Wed, Apr 8, 2015 at 7:39 AM, Gyula Fóra gyf...@apache.org wrote:
  Hey Everyone!
 
  We our proud to announce the first Apache Flink meetup group in
 Stockholm.
 
  Join us at http://www.meetup.com/Apache-Flink-Stockholm/
 
  We are looking forward to organise our first event in May!
 
  Cheers,
  Gyula





Re: k means - waiting for dataset

2015-05-21 Thread Till Rohrmann
Hi Paul,

could you share your code with us so that we see whether there is any error.

Does this error also occurs with 0.9-SNAPSHOT?

Cheers,
Till

Che

On Thu, May 21, 2015 at 11:11 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hi flink community,

 i have implement k-means for clustering temporal geo data. i use the
 following github project and my own data structure:

 https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

 not i have the problem, that flink read the centroids from file and work
 parallel futher. if i look at the results, i have the feeling, that the
 prgramm load only one centroid point.

 i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
 following exception:
 ERROR actor.OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:65)
 at akka.actor.Props.newActor(Props.scala:337)
 at akka.actor.ActorCell.newActor(ActorCell.scala:534)
 at akka.actor.ActorCell.create(ActorCell.scala:560)
 ... 9 more

 how can i say flink, that it should be wait for loading dataset, and what
 say this exception?

 best regards,
 paul



Re: flink k-means on hadoop cluster

2015-06-08 Thread Till Rohrmann
I assume that the path inputs and outputs is not correct since you get the
error message *chown `output’: No such file or directory*. Try to provide
the full path to the chown command such as
hdfs://ServerURI/path/to/your/directory.
​

On Mon, Jun 8, 2015 at 11:23 AM Pa Rö paul.roewer1...@googlemail.com
wrote:

 Hi Robert,

 i have see that you write me on stackoverflow, thanks. now the path is
 right and i get the old exception:
 org.apache.flink.runtime.JobException: Creating the input splits caused an
 error: File file:/127.0.0.1:8020/home/user/cloudera/outputs/seed-1 does
 not exist or the user running Flink ('yarn') has insufficient permissions
 to access it.

 i have look at the hdfs and want give the user yarn all permissions:
 [cloudera@quickstart bin]$ hdfs dfs -ls
 Found 9 items
 drwxrwxrwt   - cloudera cloudera  0 2015-06-03 04:24 .Trash
 drwxrwxrwt   - cloudera cloudera  0 2015-06-08 01:17 .flink
 drwxrwxrwt   - cloudera cloudera  0 2015-06-04 06:51 .staging
 drwxrwxrwt   - cloudera cloudera  0 2015-02-17 08:33 gdelt
 drwxrwxrwt   - cloudera cloudera  0 2015-06-02 06:42 inputs
 -rwxrwxrwt   1 cloudera cloudera   31223141 2015-06-03 03:53 ma-mahout.jar
 -rwxrwxrwt   1 cloudera cloudera   30037418 2015-06-03 03:53
 ma-mapreduce.jar
 drwxrwxrwt   - cloudera cloudera  0 2015-06-04 07:38 oozie-oozi
 drwxrwxrwt   - cloudera cloudera  0 2015-06-03 03:59 outputs
 [cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop inputs
 chown: `inputs': No such file or directory
 [cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop outputs
 chown: `outputs': No such file or directory

 helpful:
 https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html

 something i do wrong, maybe you have a idea?




Re: Log messages - redirect

2015-06-19 Thread Till Rohrmann
If I’m not mistaken from the shown output, you’re talking about the stdout
output of the client, right? This output is not controlled by the
log4j.properties or logback.xml file.

However, you can use any command line tool available on your platform to
redirect the stdout. For example on a Linux system, you could redirect the
log output with ./flink run Flink.jar  myFile or you could use ./flink run
Flink.jar | tee myFile to redirect the output and still print it.

Cheers,
Till

On Fri, Jun 19, 2015 at 1:19 PM Chiwan Park chiwanp...@icloud.com
http://mailto:chiwanp...@icloud.com wrote:

Hi, Flink uses slf4j and log4j for logging.

 You can override default log4j configuration programmatically. [1]
 Or you can use logback as a logging backend and override default logback
 configuration also. [2][3]

 [1] http://stackoverflow.com/a/9003191
 [2]
 http://ci.apache.org/projects/flink/flink-docs-master/internals/logging.html
 [3] http://stackoverflow.com/a/3810936

 Regards,
 Chiwan Park


  On Jun 19, 2015, at 8:05 PM, Juan Fumero 
 juan.jose.fumero.alfo...@oracle.com wrote:
 
  Hi,
   is there any option (from API level) to redirect the log messages to a
  file instead of stdout?
 
  Thanks
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.flink.api.java.ExecutionEnvironment).
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
  more info.
  06/19/2015 12:14:12 Job execution switched to status RUNNING.
  
  06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
  DEPLOYING
  06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
  RUNNING
  06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
  FINISHED
  06/19/2015 12:14:12 Job execution switched to status FINISHED.
 
 



  ​


Re: Logging in Flink 0.9.0-milestone-1

2015-06-26 Thread Till Rohrmann
Hi Stefan,

You can do this if you disableSysoutLogging and change your
log4j-cli.properties file to also print to console. There you can then
control what is logged to console. However, I think that you have to set
disableSysoutLogging in your program.

Cheers,
Till
​

On Fri, Jun 26, 2015 at 11:47 AM, Stefan Bunk stefan.b...@googlemail.com
wrote:

 Hi Robert,

 this problem persists in the 0.9 release. Using `disableSysoutLogging()`
 works, but I'd rather configure this in the log4j.xml. Is this possible?

 Best,
 Stefan

 On 14 April 2015 at 20:55, Robert Metzger rmetz...@apache.org wrote:

 You can control the logging behavior from the ExecutionConfig
 (env.getExecutionConfig()).
 There is a method (disableSysoutLogging()) that you can use. (In
 0.9-SNAPSHOT only).

 Sorry again that you ran into this issue.

 On Tue, Apr 14, 2015 at 8:45 PM, Robert Metzger rmetz...@apache.org
 wrote:

 Ah, I see.

 The issue is this line in the JobClient.scala here:
 https://github.com/apache/flink/blob/release-0.9.0-milestone-1-rc1/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala#L97

 As you can see, its doing sysout logging.
 In the current master, this has been reworked:
 https://github.com/apache/flink/blob/69a400fadd258fe0a1ff0b5670a3611fda4c1cdf/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java#L60
 (I'm currently looking up how to control the behavior)

 If its possible for you, you can switch to 0.9-SNAPSHOT.

 Best,
 Robert


 On Tue, Apr 14, 2015 at 7:02 PM, Stefan Bunk stefan.b...@googlemail.com
  wrote:

 Hi,

 I am also using IntelliJ, and I am starting directly from the IDE.
 Local execution.
 This is what my logging output looks like: [1].

 I am getting my logger via:
 val log = org.apache.log4j.Logger.getLogger(getClass)

 [1] https://gist.github.com/knub/1c11683601b4eeb5d51b

 On 14 April 2015 at 18:47, Robert Metzger rmetz...@apache.org wrote:

 Hi,

 how are you starting Flink? Out of the IDE? Using the scripts?

 I just created a new flink project with the milestone version. Just
 putting your log4j.xml into the resources folder enabled the logging (I've
 set it to INFO for flink and it worked).
 I've used IntelliJ and started the WordCount.java from the quickstart.

 I think I need more details about your setup to reproduce the issue.




 On Tue, Apr 14, 2015 at 5:52 PM, Stefan Bunk 
 stefan.b...@googlemail.com wrote:

 Hi Robert,

 thanks for the info.
 Adding the parameter didn't help. My logging file is found and my
 logging configuration for my own logging is working (even without the
 parameter), it's just that the file in the jar seems to be preferred over
 my file.

 Best,
 Stefan

 On 14 April 2015 at 17:16, Robert Metzger rmetz...@apache.org
 wrote:

 Hi Stefan,

 we made a stupid mistake in the 0.9.0-milestone-1 release by
 including our log4j.properties into the flink-runtime jar. its also in 
 the
 fat jar in flink-dist.

 Maybe you can pass the name of your log4j file to your application
 with -Dlog4j.configuration=log4j.xml?

 The issue is already fixed in the current master but that doesn't
 help 0.9.0-milestone-1 users :(

 Best,
 Robert

 On Tue, Apr 14, 2015 at 4:49 PM, Stefan Bunk 
 stefan.b...@googlemail.com wrote:

 Hi Flinkers,

 I just switched to 0.9.0-milestone-1, and now I get Flink's logging
 output again in my console (local execution).
 I have a log4j.xml under src/main/resources, which says not to log
 any Flink job progress updates, and which worked fine so far:

 [...]
 logger name=org.apache.flink
 level value=ERROR /
 appender-ref ref=Console /
 /logger
 [...]
 See the full file at [1].

 However, this file seems to be ignored now. What do I need to
 change?

 Cheers
 Stefan

 [1] https://gist.github.com/knub/bb1b3ffc6b47e2a1376d











Re: Flink-ML as Dependency

2015-06-11 Thread Till Rohrmann
Hi Max,

I just tested a build using gradle (with your build.gradle file) and some
flink-ml algorithms. And it was completed without the problem of the
unresolved breeze dependency.

I use the version 2.2.1 of Gradle. Which version are you using?

Since you’re using Flink’s snapshots and have specified only the local
maven repository, can you re-install flink again and check whether the
error still occurs? Simple call mvn clean install -DskipTests
-Dmaven.javadoc.skip=true from the root directory of Flink.

Cheers,
Till

On Wed, Jun 10, 2015 at 3:38 PM Maximilian Alber alber.maximil...@gmail.com
http://mailto:alber.maximil...@gmail.com wrote:

Hi Flinksters,

 I would like to test FlinkML. Unfortunately, I get an error when including
 it to my project. It might be my error as I'm not experienced with Gradle,
 but with Google I got any wiser.

 My build.gradle looks as follows:

 apply plugin: 'java'
 apply plugin: 'scala'

 //sourceCompatibility = 1.5
 version = '1.0'
 jar {
 manifest {
 attributes 'Implementation-Title': 'Test Project',
'Implementation-Version': 1.0
 }
 }

 repositories {
   mavenCentral()
   mavenLocal()
 }

 dependencies {
   compile 'org.scala-lang:scala-library:2.10.5'
   compile 'org.scala-lang:scala-compiler:2.10.5'

   compile 'org.scalanlp:breeze_2.10:0.11.2'

   compile group: 'org.apache.flink', name: 'flink-clients', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-scala', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-ml', version:
 '0.9-SNAPSHOT'
 }

 And I get the following error:

 alber@alberTU:/media/alber/datadisk/tmp/flink/code/test$ gradle
 compileScala
 Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar

 FAILURE: Build failed with an exception.

 * What went wrong:
 Could not resolve all dependencies for configuration ':compile'.
  Could not resolve org.scalanlp:breeze_${scala.binary.version}:0.11.2.
   Required by:
   :test:1.0  org.apache.flink:flink-ml:0.9-SNAPSHOT
 Illegal character in path at index 51:
 http://repo1.maven.org/maven2/org/scalanlp/breeze_${scala.binary.version}/0.11.2/breeze_${scala.binary.version}-0.11.2.pom

 * Try:
 Run with --stacktrace option to get the stack trace. Run with --info or
 --debug option to get more log output.

 BUILD FAILED

 Total time: 7.113 secs


 I'm thankful for any ideas!

 Cheers,
 Max

​


Re: Flink-ML as Dependency

2015-06-10 Thread Till Rohrmann
Hi Max,

I think the reason is that the flink-ml pom contains as a dependency an
artifact with artifactId breeze_${scala.binary.version}. The variable
scala.binary.version is defined in the parent pom and not substituted when
flink-ml is installed. Therefore gradle tries to find a dependency with the
name breeze_${scala.binary.version}.

I try to find a solution for this problem. As a quick work around you
should be able to define the variable manually and set it to 2.10.

Cheers,
Till
​

On Wed, Jun 10, 2015 at 3:38 PM Maximilian Alber alber.maximil...@gmail.com
wrote:

 Hi Flinksters,

 I would like to test FlinkML. Unfortunately, I get an error when including
 it to my project. It might be my error as I'm not experienced with Gradle,
 but with Google I got any wiser.

 My build.gradle looks as follows:

 apply plugin: 'java'
 apply plugin: 'scala'

 //sourceCompatibility = 1.5
 version = '1.0'
 jar {
 manifest {
 attributes 'Implementation-Title': 'Test Project',
'Implementation-Version': 1.0
 }
 }

 repositories {
   mavenCentral()
   mavenLocal()
 }

 dependencies {
   compile 'org.scala-lang:scala-library:2.10.5'
   compile 'org.scala-lang:scala-compiler:2.10.5'

   compile 'org.scalanlp:breeze_2.10:0.11.2'

   compile group: 'org.apache.flink', name: 'flink-clients', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-scala', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-ml', version:
 '0.9-SNAPSHOT'
 }

 And I get the following error:

 alber@alberTU:/media/alber/datadisk/tmp/flink/code/test$ gradle
 compileScala
 Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar

 FAILURE: Build failed with an exception.

 * What went wrong:
 Could not resolve all dependencies for configuration ':compile'.
  Could not resolve org.scalanlp:breeze_${scala.binary.version}:0.11.2.
   Required by:
   :test:1.0  org.apache.flink:flink-ml:0.9-SNAPSHOT
 Illegal character in path at index 51:
 http://repo1.maven.org/maven2/org/scalanlp/breeze_${scala.binary.version}/0.11.2/breeze_${scala.binary.version}-0.11.2.pom

 * Try:
 Run with --stacktrace option to get the stack trace. Run with --info or
 --debug option to get more log output.

 BUILD FAILED

 Total time: 7.113 secs


 I'm thankful for any ideas!

 Cheers,
 Max



Re: Flink-ML as Dependency

2015-06-11 Thread Till Rohrmann
Hmm then I assume that version 2 can properly handle maven property
variables.

On Thu, Jun 11, 2015 at 3:05 PM Maximilian Alber alber.maximil...@gmail.com
wrote:

 Hi Till,

 I use the standard one for Ubuntu 15.04, which is 1.5.

 That did not make any difference.

 Thanks and Cheers,
 Max

 On Thu, Jun 11, 2015 at 11:22 AM, Till Rohrmann trohrm...@apache.org
 wrote:

 Hi Max,

 I just tested a build using gradle (with your build.gradle file) and some
 flink-ml algorithms. And it was completed without the problem of the
 unresolved breeze dependency.

 I use the version 2.2.1 of Gradle. Which version are you using?

 Since you’re using Flink’s snapshots and have specified only the local
 maven repository, can you re-install flink again and check whether the
 error still occurs? Simple call mvn clean install -DskipTests
 -Dmaven.javadoc.skip=true from the root directory of Flink.

 Cheers,
 Till

 On Wed, Jun 10, 2015 at 3:38 PM Maximilian Alber
 alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com
 wrote:

 Hi Flinksters,

 I would like to test FlinkML. Unfortunately, I get an error when
 including it to my project. It might be my error as I'm not experienced
 with Gradle, but with Google I got any wiser.

 My build.gradle looks as follows:

 apply plugin: 'java'
 apply plugin: 'scala'

 //sourceCompatibility = 1.5
 version = '1.0'
 jar {
 manifest {
 attributes 'Implementation-Title': 'Test Project',
'Implementation-Version': 1.0
 }
 }

 repositories {
   mavenCentral()
   mavenLocal()
 }

 dependencies {
   compile 'org.scala-lang:scala-library:2.10.5'
   compile 'org.scala-lang:scala-compiler:2.10.5'

   compile 'org.scalanlp:breeze_2.10:0.11.2'

   compile group: 'org.apache.flink', name: 'flink-clients', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-scala', version:
 '0.9-SNAPSHOT'
   compile group: 'org.apache.flink', name: 'flink-ml', version:
 '0.9-SNAPSHOT'
 }

 And I get the following error:

 alber@alberTU:/media/alber/datadisk/tmp/flink/code/test$ gradle
 compileScala
 Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar

 FAILURE: Build failed with an exception.

 * What went wrong:
 Could not resolve all dependencies for configuration ':compile'.
  Could not resolve org.scalanlp:breeze_${scala.binary.version}:0.11.2.
   Required by:
   :test:1.0  org.apache.flink:flink-ml:0.9-SNAPSHOT
 Illegal character in path at index 51:
 http://repo1.maven.org/maven2/org/scalanlp/breeze_${scala.binary.version}/0.11.2/breeze_${scala.binary.version}-0.11.2.pom

 * Try:
 Run with --stacktrace option to get the stack trace. Run with --info or
 --debug option to get more log output.

 BUILD FAILED

 Total time: 7.113 secs


 I'm thankful for any ideas!

 Cheers,
 Max

 ​





Re: Monitoring memory usage of a Flink Job

2015-06-15 Thread Till Rohrmann
Hi Tamara,

you can instruct Flink to write the current memory statistics to the log by
setting taskmanager.debug.memory.startLogThread: true in the Flink
configuration. Furthermore, you can control the logging interval with
taskmanager.debug.memory.logIntervalMs where the interval is specified in
milli seconds.

Cheers,
Till

On Mon, Jun 15, 2015 at 2:46 PM Fabian Hueske fhue...@gmail.com
http://mailto:fhue...@gmail.com wrote:

Hi Tamara,

 what kind of information do you need? Something like, size and usage of
 in-memory sort buffers or hash tables?
 Some information might written in DEBUG logs, but I'm not sure about that.
 Besides logs, I doubt that Flink monitors memory usage.

 Cheers, Fabian

 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com:

 Hi,

 I am running some experiments on Flink and was wondering if there is some
 way to monitor the memory usage of a Flink Job (running locally and on a
 cluster). I need to run multiple jobs and compare their memory usage.

 Cheers,

 Tamara


  ​


Re: Random Selection

2015-06-15 Thread Till Rohrmann
Hi Max,

the problem is that you’re trying to serialize the companion object of
scala.util.Random. Try to create an instance of the scala.util.Random class
and use this instance within your RIchFilterFunction to generate the random
numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com
http://mailto:alber.maximil...@gmail.com wrote:

Hi Flinksters,

 I would like to randomly choose a element of my data set. But somehow I
 cannot use scala.util inside my filter functions:

   val sample_x = X filter(new RichFilterFunction[Vector](){
 var i: Int = -1

 override def open(config: Configuration) = {
   i = scala.util.Random.nextInt(N)
 }
 def filter(a: Vector) = a.id == i
   })
   val sample_y = Y filter(new RichFilterFunction[Vector](){
 def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
   })

 That's the error I get:

 Exception in thread main org.apache.flink.optimizer.CompilerException:
 An error occurred while translating the optimized plan to a nephele
 JobGraph: Error translating node 'Filter Filter at
 Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties
 [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
 grouped=null, unique=null] ]]': Could not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
 at
 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
 at
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
 at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
 at Test$delayedInit$body.apply(test.scala:304)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at Test$.main(test.scala:45)
 at Test.main(test.scala)
 Caused by: org.apache.flink.optimizer.CompilerException: Error translating
 node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP
 [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': Could not write the user
 code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
 at
 org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
 ... 21 more
 Caused by:
 org.apache.flink.runtime.operators.util.CorruptConfigurationException:
 Could not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
 ... 26 more
 Caused by: java.io.NotSerializableException:
 org.apache.flink.api.scala.DataSet
 

Re: Random Shuffling

2015-06-15 Thread Till Rohrmann
Hi Max,

you can always shuffle your elements using the rebalance method. What Flink
here does is to distribute the elements of each partition among all
available TaskManagers. This happens in a round-robin fashion and is thus
not completely random.

A different mean is the partitionCustom method which allows you to specify
for each element to which partition it shall be sent. You would have to
specify a Partitioner to do this.

For the splitting there is at moment no syntactic sugar. What you can do,
though, is to assign each item a split ID and then use a filter operation
to filter the individual splits. Depending on you split ID distribution you
will have differently sized splits.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:50 PM Maximilian Alber alber.maximil...@gmail.com
http://mailto:alber.maximil...@gmail.com wrote:

Hi Flinksters,

 I would like to shuffle my elements in the data set and then split it in
 two according to some ratio. Each element in the data set has an unique id.
 Is there a nice way to do it with the flink api?
 (It would be nice to have guaranteed random shuffling.)
 Thanks!

 Cheers,
 Max

​


Re: Choosing random element

2015-06-16 Thread Till Rohrmann
This might help you [1].

Cheers,
Till

[1]
http://stackoverflow.com/questions/2514061/how-to-pick-random-small-data-samples-using-map-reduce

On Tue, Jun 16, 2015 at 10:16 AM Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 again a similar problem. I would like to choose ONE random element out of
 a data set, without shuffling the whole set. Again I would like to have the
 element (mathematically) randomly chosen.

 Thanks!
 Cheers,
 Max



Re: Help with Flink experimental Table API

2015-06-11 Thread Till Rohrmann
Hi Shiti,

here is the issue [1].

Cheers,
Till

[1] https://issues.apache.org/jira/browse/FLINK-2203

On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena ssaxena@gmail.com wrote:

 Hi Aljoscha,

 Could you please point me to the JIRA tickets? If you could provide some
 guidance on how to resolve these, I will work on them and raise a
 pull-request.

 Thanks,
 Shiti

 On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek aljos...@apache.org
 wrote:

 Hi,
 yes, I think the problem is that the RowSerializer does not support
 null-values. I think we can add support for this, I will open a Jira issue.

 Another problem I then see is that the aggregations can not properly deal
 with null-values. This would need separate support.

 Regards,
 Aljoscha

 On Thu, 11 Jun 2015 at 06:41 Shiti Saxena ssaxena@gmail.com wrote:

 Hi,

 In our project, we are using the Flink Table API and are facing the
 following issues,

 We load data from a CSV file and create a DataSet[Row]. The CSV file can
 also have invalid entries in some of the fields which we replace with null
 when building the DataSet[Row].

 This DataSet[Row] is later on transformed to Table whenever required and
 specific operation such as select or aggregate, etc are performed.

 When a null value is encountered, we get a null pointer exception and
 the whole job fails. (We can see this by calling collect on the resulting
 DataSet).

 The error message is similar to,

 Job execution failed.
 org.apache.flink.runtime.client.JobExecutionException: Job execution
 failed.
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.NullPointerException
 at
 org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 at
 org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 at
 org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
 at
 org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
 at
 org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
 at
 org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
 at
 org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
 at
 org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
 at
 org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
 at
 org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
 at
 org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 at java.lang.Thread.run(Thread.java:724)

 Could this be because the RowSerializer does not support null values?
 (Similar to Flink-629 https://issues.apache.org/jira/browse/FLINK-629
  )

 Currently, to overcome this issue, we are ignoring all the rows which
 may have null values. For example, we have a method cleanData defined as,

 def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
 val whereClause: String = relevantColumns.map{
 cName=
 s$cName.isNotNull
 }.mkString(  )

 val result :Table =
 table.select(relevantColumns.mkString(,)).where(whereClause)
 result
 }

 Before operating on any Table, we use this method and then continue with
 task.

 Is this the right way to handle this? If not please let me know how to
 go about it.


Re: Building master branch is failed

2015-05-29 Thread Till Rohrmann
Yes, this is another error. Seems to be related to the new scala shell.

On Fri, May 29, 2015 at 11:00 AM, Chiwan Park chiwanp...@icloud.com wrote:

 I fetched master branch and ran again. But I got the same error.
 It seems that the problem is related to javadoc. Till’s fix is related to
 renaming in flink-ml package.

 Regards,
 Chiwan Park


 On May 29, 2015, at 5:39 PM, Stephan Ewen se...@apache.org wrote:

 A bug sneaked in...

 I think Till just pushed a fix for that, so if you pull now, it should
 work again...



 On Fri, May 29, 2015 at 10:28 AM, Chiwan Park chiwanp...@icloud.com
 wrote:

 Hi :)

 I tried building current master branch with `mvn clean package
 -DskipTests` command.
 But I got a error following:

 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 08:11 min
 [INFO] Finished at: 2015-05-29T17:21:45+09:00
 [INFO] Final Memory: 151M/1524M
 [INFO]
 
 [ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:jar (attach-javadocs)
 on project flink-scala-shell: MavenReportException: Error while creating
 archive:
 [ERROR] Exit code: 1 - javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: error - No public or protected classes found to document.
 [ERROR]
 [ERROR] Command line was:
 /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/javadoc
 @options @packages
 [ERROR]
 [ERROR] Refer to the generated Javadoc files in
 '/Users/chiwanpark/IdeaProjects/flink/flink-staging/flink-scala-shell/target/apidocs'
 dir.
 [ERROR] - [Help 1]
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with the
 -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
 [ERROR]
 [ERROR] After correcting the problems, you can resume the build with the
 command
 [ERROR]   mvn goals -rf :flink-scala-shell

 I ran the command in OS X 10.10.3, Oracle JDK 1.8.0_45, Maven 3.3.1.
 How can I solve this problem?


 Regards,
 Chiwan Park








Re: OutOfMemoryException: unable to create native thread

2015-07-01 Thread Till Rohrmann
Hi Chan,

if you feel up to implementing such an input format, then you can also
contribute it. You simply have to open a JIRA issue and take ownership of
it.

Cheers,
Till

On Wed, Jul 1, 2015 at 10:08 AM, chan fentes chanfen...@gmail.com wrote:

 Thank you all for your help and for pointing out different possibilities.
 It would be nice to have an input format that takes a directory and a
 regex pattern (for file names) to create one data source instead of 1500.
 This would have helped me to avoid the problem. Maybe this can be included
 in one of the future releases. ;)

 2015-06-30 19:02 GMT+02:00 Stephan Ewen se...@apache.org:

 I agree with Aljoscha and Ufuk.

 As said, it will be hard for the system (currently) to handle 1500
 sources, but handling a parallel source with 1500 files will be very
 efficient.
 This is possible, if all sources (files) deliver the same data type and
 would be unioned.

 If that is true, you can

  - Specify the input as a directory.

  - If you cannot do that, because there is no common parent directory,
 you can union the files into one data source with a simple trick, as
 described here:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/open-multiple-file-from-list-of-uri-tp1804p1807.html



 On Tue, Jun 30, 2015 at 5:36 PM, Aljoscha Krettek aljos...@apache.org
 wrote:

 Hi Chan,
 Flink sources support giving a directory as an input path in a source.
 If you do this it will read each of the files in that directory. They way
 you do it leads to a very big plan, because the plan will be replicated
 1500 times, this could lead to the OutOfMemoryException.

 Is there a specific reason why you create 1500 separate sources?

 Regards,
 Aljoscha

 On Tue, 30 Jun 2015 at 17:17 chan fentes chanfen...@gmail.com wrote:

 Hello,

 how many data sources can I use in one Flink plan? Is there any limit?
 I get an
 java.lang.OutOfMemoryException: unable to create native thread
 when having approx. 1500 files. What I basically do is the following:
 DataSource -Map - Map - GroupBy - GroupReduce per file
 and then
 Union - GroupBy - Sum in a tree-like reduction.

 I have checked the workflow. It runs on a cluster without any problem,
 if I only use few files. Does Flink use a thread per operator? It seems as
 if I am limited in the amount of threads I can use. How can I avoid the
 exception mentioned above?

 Best regards
 Chan






Re: time measured for each iteration in KMeans

2015-07-01 Thread Till Rohrmann
Do you also have the rest of the code. It would be helpful in order to find
out why it's not working.

Cheers,
TIll

On Wed, Jul 1, 2015 at 1:31 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 now i have implement a time logger in the open and close methods, it is
 wrok fine, but i try to initial the flink class with a parameter (counter
 of benchmark round),
 but it will initial always with 0. but i get no exception. what i do wrong?

 my benchmark class:

 public class FlinkBenchmarkLauncher {

 private static ListLong times = new ArrayListLong();

 public static void main(String[] args) throws Exception {
 Properties pro = new Properties();

 pro.load(FlinkBenchmarkLauncher.class.getResourceAsStream(/config.properties));
 int benchRounds =
 Integer.parseInt(pro.getProperty(benchmark.rounds));

 FileSystem fs = LocalFileSystem.get(new
 URI(pro.getProperty(hdfs.namenode)),new
 org.apache.hadoop.conf.Configuration());
 String outputPath =
 fs.getHomeDirectory()+pro.getProperty(flink.output);

 for(int i=0;ibenchRounds;i++) {
 FlinkMain fm = new FlinkMain((i+1));

 long start = System.nanoTime();
 fm.run();
 long end = System.nanoTime();

 times.add(end-start);
 if(i!=benchRounds-1) {
 fs.delete(new Path(outputPath+/points), false);
 fs.delete(new Path(outputPath+/centers), false);
 }
 }
 BenchmarkHelper.writeTimekeepingToFile(times, flink_benchmark);
 }
 }

 my flink main class.

 public class FlinkMain implements Runnable{

 private static int benchmarkCounter;
 private static int iterationCounter = 1;

 FlinkMain(int benchmarkCounter) {
 this.benchmarkCounter = benchmarkCounter;
 }

 public void run() {
 // my execute
...


 2015-06-26 12:34 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 Thank you for your quick response.
 I just do not understand quite what you mean. I need to define a method
 in Java or do you mean that I should use the monitoring in the browser?

 2015-06-26 10:09 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,

 The TaskManager which is running the Sync task is logging when its
 starting the next iteration. I know its not very convenient.
 You can also log the time and Iteration id (from the
 IterationRuntimeContext) in the open() method.

 On Fri, Jun 26, 2015 at 9:57 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hello flink community,

 i have write a k means app for clustering temporal geo data. now i want
 know how many time flink need for compute one iteration. Is it possible to
 measure that, cause of the execution engine of flink?

 best regards,
 paul







Re: The slot in which the task was scheduled has been killed (probably loss of TaskManager)

2015-06-30 Thread Till Rohrmann
Do you have the JobManager and TaskManager logs of the corresponding TM, by
any chance?

On Mon, Jun 29, 2015 at 8:12 PM, Andra Lungu lungu.an...@gmail.com wrote:

 Something similar in flink-0.10-SNAPSHOT:

 06/29/2015 10:33:46 CHAIN Join(Join at main(TriangleCount.java:79)) -
 Combine (Reduce at main(TriangleCount.java:79))(222/224) switched to FAILED
 java.lang.Exception: The slot in which the task was executed has been
 released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @
 wally025 - 8 slots - URL: akka.tcp://
 flink@130.149.249.35:56135/user/taskmanager
 at
 org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
 at
 org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
 at
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
 at
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 06/29/2015 10:33:46 Job execution switched to status FAILING.


 On Mon, Jun 29, 2015 at 1:08 PM, Alexander Alexandrov 
 alexander.s.alexand...@gmail.com wrote:

 I witnessed a similar issue yesterday on a simple job (single task chain,
 no shuffles) with a release-0.9 based fork.

 2015-04-15 14:59 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Yes , sorry for that..I found it somewhere in the logs..the problem was
 that the program didn't die immediately but was somehow hanging and I
 discovered the source of the problem only running the program on a subset
 of the data.

 Thnks for the support,
 Flavio

 On Wed, Apr 15, 2015 at 2:56 PM, Stephan Ewen se...@apache.org wrote:

 This means that the TaskManager was lost. The JobManager can no longer
 reach the TaskManager and consists all tasks executing ob the TaskManager
 as failed.

 Have a look at the TaskManager log, it should describe why the
 TaskManager failed.
 Am 15.04.2015 14:45 schrieb Flavio Pompermaier pomperma...@okkam.it
 :

 Hi to all,

 I have this strange error in my job and I don't know what's going on.
 What can I do?

 The full exception is:

 The slot in which the task was scheduled has been killed (probably
 loss of TaskManager).
 at
 org.apache.flink.runtime.instance.SimpleSlot.cancel(SimpleSlot.java:98)
 at
 org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment.releaseSimpleSlot(SlotSharingGroupAssignment.java:335)
 at
 org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:319)
 at
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:106)
 at
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:151)
 at
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:435)
 at
 

Re: k means - waiting for dataset

2015-05-21 Thread Till Rohrmann
Concerning your first problem that you only see one resulting centroid,
your code looks good modulo the parts you haven't posted.

However, your problem could simply be caused by a bad selection of initial
centroids. If, for example, all centroids except for one don't get any
points assigned, then only one centroid will survive the iteration step.
How do you do it?

To check that all centroids are read you can print the contents of the
centroids DataSet. Furthermore, you can simply println the new centroids
after each iteration step. In local mode you can then observe the
computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen se...@apache.org wrote:

 Hi!

 This problem should not depend on any user code. There are no user-code
 dependent actors in Flink.

 Is there more stack trace that you can send us? It looks like it misses
 the core exception that is causing the issue is not part of the stack trace.

 Greetings,
 Stephan



 On Thu, May 21, 2015 at 11:11 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi flink community,

 i have implement k-means for clustering temporal geo data. i use the
 following github project and my own data structure:

 https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

 not i have the problem, that flink read the centroids from file and work
 parallel futher. if i look at the results, i have the feeling, that the
 prgramm load only one centroid point.

 i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
 following exception:
 ERROR actor.OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:65)
 at akka.actor.Props.newActor(Props.scala:337)
 at akka.actor.ActorCell.newActor(ActorCell.scala:534)
 at akka.actor.ActorCell.create(ActorCell.scala:560)
 ... 9 more

 how can i say flink, that it should be wait for loading dataset, and what
 say this exception?

 best regards,
 paul





Re: SLIDES: Overview of Apache Flink: Next-Gen Big Data Analytics Framework

2015-07-07 Thread Till Rohrmann
Nice, thanks for sharing Slim.

Cheers,
Till

On Tue, Jul 7, 2015 at 6:19 AM, Slim Baltagi sbalt...@gmail.com wrote:

 Hi

 This is the link *http://goo.gl/gVOSp8* to the slides of my talk on June
 30,
 2015 at the Chicago Apache Flink meetup.

 Although most of the current buzz is about Apache Spark, the talk shows how
 Apache Flink offers the only hybrid open source (Real-Time Streaming +
 Batch) distributed data processing engine supporting many use cases:
 Real-Time stream processing, machine learning at scale, graph analytics and
 batch processing.

 Many slides are also dedicated to showing why Apache Flink is an
 alternative
 to Apache Hadoop MapReduce, Apache Storm and Apache Spark!

 Thanks

 Slim Baltagi




 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SLIDES-Overview-of-Apache-Flink-Next-Gen-Big-Data-Analytics-Framework-tp1966.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: Tuple model project

2015-07-30 Thread Till Rohrmann
You could try to use the TypeSerializerInputFormat.
​

On Thu, Jul 30, 2015 at 2:08 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 How can I create a Flink dataset given a directory path that contains a
 set of java objects serialized with kryo (one file per object)?

 On Thu, Jul 30, 2015 at 1:41 PM, Till Rohrmann trohrm...@apache.org
 wrote:

 Hi Flavio,

 in order to use the Kryo serializer for a given type you can use the
 registerTypeWithKryoSerializer of the ExecutionEnvironment object. What
 you provide to the method is the type you want to be serialized with kryo
 and an implementation of the com.esotericsoftware.kryo.Serializer class.
 If the given type is not supported by Flink’s own serialization framework,
 then this custom serializer should be used. You register the types at the
 beginning of your Flink program:

 def main(args: Array[String]): Unit = {
   val env = ExecutionEnvironment.getExecutionEnvironment

   env.registerTypeWithKryoSerializer(classOf[MyType], 
 classOf[MyTypeSerializer])

   ...

   env.execute()

 }

 Cheers,
 Till
 ​

 On Thu, Jul 30, 2015 at 12:45 PM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 I have a project that produce RDF quads and I have to store to read them
 with Flink afterwards.
 I could use thrift/protobuf/avro but this means to add a lot of
 transitive dependencies to my project.
 Maybe I could use Kryo to store those objects..is there any example to
 create a dataset of objects serialized with kryo?

 On Thu, Jul 30, 2015 at 11:10 AM, Stephan Ewen se...@apache.org wrote:

 Quick response: I am not opposed to that, but there are tuple libraries
 around already.

 Do you need specifically the Flink tuples, for interoperability between
 Flink and other projects?

 On Thu, Jul 30, 2015 at 11:07 AM, Stephan Ewen se...@apache.org
 wrote:

 Should we move this to the dev list?

 On Thu, Jul 30, 2015 at 10:43 AM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 Any thought about this (move tuples classes in a separate
 self-contained project with no transitive dependencies so that to be 
 easily
 used in other external projects)?

 On Mon, Jul 6, 2015 at 11:09 AM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 Do you think it could be a good idea to extract Flink tuples in a
 separate project so that to allow simpler dependency management in
 Flin-compatible projects?

 On Mon, Jul 6, 2015 at 11:06 AM, Fabian Hueske fhue...@gmail.com
 wrote:

 Hi,

 at the moment, Tuples are more efficient than POJOs, because POJO
 fields are accessed via Java reflection whereas Tuple fields are 
 directly
 accessed.
 This performance penalty could be overcome by code-generated
 seriliazers and comparators but I am not aware of any work in that
 direction.

 Best, Fabian

 2015-07-06 11:01 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it
 :

 Hi to all,
 I was thinking to write my own flink-compatible library and I need
 basically a Tuple5.

 Is there any performace loss in using a POJO with 5 String fields
 vs a Tuple5?
 If yes, wouldn't be a good idea to extract flink tuples in a
 separate simple project (e.g. flink-java-tuples) that has no other
 dependency to enable other libs to write their flink-compatible logic
 without the need to exclude all the transitive dependency of 
 flink-java?

 Best,
 Flavio













Re: java.lang.ClassNotFoundException when deploying streaming jar locally

2015-08-06 Thread Till Rohrmann
Hi Michael,

in the flink-test-0.1.jar the class DaoJoin$1.class is located at
com/davengo/rfidcloud/flink but Flink tries to load
com.otter.ist.flink.DaoJoin$1. This might be the problem. This is somehow
odd because in the source code you’ve specified the correct package
com.otter.ist.flink.

Cheers,
Till
​

On Thu, Aug 6, 2015 at 11:27 AM, Michael Huelfenhaus 
m.huelfenh...@davengo.com wrote:

 hi,

 how did you build the jar file?


 mvn clean install -Pbuild-jar

 Have you checked whether your classes are in the jar file?


 yes, this seems alright for me

  jar tf target/flink-test-0.1.jar
 META-INF/MANIFEST.MF
 META-INF/
 com/
 com/davengo/
 com/davengo/rfidcloud/
 com/davengo/rfidcloud/flink/
 com/davengo/rfidcloud/flink/DaoJoin$1.class
 com/davengo/rfidcloud/flink/DaoJoin.class
 com/davengo/rfidcloud/flink/streampojos/
 com/davengo/rfidcloud/flink/streampojos/EpcTuple.class
 log4j.properties
 META-INF/maven/
 META-INF/maven/com.davengo.rfidcloud.flink/
 META-INF/maven/com.davengo.rfidcloud.flink/flink-test/
 META-INF/maven/com.davengo.rfidcloud.flink/flink-test/pom.xml
 META-INF/maven/com.davengo.rfidcloud.flink/flink-test/pom.properties

 Am 06.08.2015 um 11:21 schrieb Robert Metzger rmetz...@apache.org:

 Hi,

 how did you build the jar file?
 Have you checked whether your classes are in the jar file?

 On Thu, Aug 6, 2015 at 11:08 AM, Michael Huelfenhaus 
 m.huelfenh...@davengo.com wrote:

 Hello everybody

 I am truing to build a very simple streaming application with the nightly
 build of flink 0.10, my code runs fine in eclipse.

 But when I build and deploy the jar locally I always get
 java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1

 There is also no plan visible in the web interface.

 I start the local flink 0.10 with start-local-streaming.sh  after
 building it from the git code

 Below you find the complete error, my code and the pom.xml any help is
 appreciated.

 Cheers Michael


 error log from web interface:
 An error occurred while invoking the program:

 The main method caused an error.


 org.apache.flink.runtime.client.JobExecutionException: Job execution
 failed.
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:364)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:40)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
 at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
 org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.Exception: Call to registerInputOutput() of
 invokable failed
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
 Cannot instantiate user function.
 at
 org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
 at
 org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:173)
 at
 org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:159)
 at
 org.apache.flink.streaming.runtime.tasks.OutputHandler.init(OutputHandler.java:107)
 at
 

Re: Pass not serializable objects to Flink transformation functions

2015-07-27 Thread Till Rohrmann
Hi Flavio,

for the user code logic Flink uses exclusively Java serialization. What you
can do, though, is to override the readObject and writeObject methods which
are used by Java serialization. Within the methods you can serialize the
other object you’re referencing.

Cheers,
Till
​

On Mon, Jul 27, 2015 at 10:05 AM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Hi to all,

 in my Flink job I initialize some java object that doesn't implement
 serializable to use it within some Flink function (i.e. map or flatMap). At
 the moment the only way to achieve that is to keep those operators as
 private classes in the main one and reference to static fields or implement
 them as inner classes and reference to final parameters of the containing
 function.

 Is there any way to move those classes to a normal class at the moment
 (e.g. forcing kryo serialization)?

 Best,
 Flavio



Re: filter as termination condition

2015-07-22 Thread Till Rohrmann
Sachin is right that the filter has to be inverted. Furthermore, the join
operation is not right here. You have to do a kind of a left outer join
where you only keep the elements which join with NULL. Here is an example
of how one could do it [1].

Cheers,
Till

[1]
http://stackoverflow.com/questions/31558326/apache-flink-filter-as-termination-condition/31559947#31559947
​

On Wed, Jul 22, 2015 at 2:23 PM, Sachin Goel sachingoel0...@gmail.com
wrote:

 It appears that you're returning true when the previous and current
 solution are the same. You should instead return false in that case,
 because this is when the iteration should terminate.
 Further, instead of joining, it would be a good idea to broadcast the new
 solution to the old solution [or the other way around] and have some
 tolerance value instead of an exact equality check.

 Cheers!
 Sachin

 -- Sachin Goel
 Computer Science, IIT Delhi
 m. +91-9871457685
 On Jul 22, 2015 5:46 PM, Stephan Ewen se...@apache.org wrote:

 Termination happens if the termination criterion data set is empty.

 Maybe your filter is too aggressive and filters out everything, or the
 join is wrong and nothing joins...

 On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hello,

 i have define a filter for the termination condition by k-means.
 if i run my app it always compute only one iteration.

 i think the problem is here:
 DataSetGeoTimeDataCenter finalCentroids = loop.closeWith(newCentroids,
 newCentroids.join(loop).where(*).equalTo(*).filter(new MyFilter()));
 or maybe the filter function:
 public static final class MyFilter implements
 FilterFunctionTuple2GeoTimeDataCenter, GeoTimeDataCenter {

 private static final long serialVersionUID =
 5868635346889117617L;

 public boolean filter(Tuple2GeoTimeDataCenter,
 GeoTimeDataCenter tuple) throws Exception {
 if(tuple.f0.equals(tuple.f1)) {
 return true;
 }
 else {
 return false;
 }
 }
 }

 best regards,
 paul

 my full code here:

 public void run() {
 //load properties
 Properties pro = new Properties();
 FileSystem fs = null;
 try {

 pro.load(FlinkMain.class.getResourceAsStream(/config.properties));
 fs = FileSystem.get(new
 URI(pro.getProperty(hdfs.namenode)),new
 org.apache.hadoop.conf.Configuration());
 } catch (Exception e) {
 e.printStackTrace();
 }

 int maxIteration =
 Integer.parseInt(pro.getProperty(maxiterations));
 String outputPath =
 fs.getHomeDirectory()+pro.getProperty(flink.output);
 // set up execution environment
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 // get input points
 DataSetGeoTimeDataTupel points = getPointDataSet(env);
 DataSetGeoTimeDataCenter centroids = null;
 try {
 centroids = getCentroidDataSet(env);
 } catch (Exception e1) {
 e1.printStackTrace();
 }
 // set number of bulk iterations for KMeans algorithm
 IterativeDataSetGeoTimeDataCenter loop =
 centroids.iterate(maxIteration);
 DataSetGeoTimeDataCenter newCentroids = points
 // compute closest centroid for each point
 .map(new
 SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
 centroids)
 // count and sum point coordinates for each centroid
 .groupBy(0).reduceGroup(new CentroidAccumulator())
 // compute new centroids from point counts and coordinate
 sums
 .map(new CentroidAverager(this.getBenchmarkCounter()));
 // feed new centroids back into next iteration with termination
 condition
 DataSetGeoTimeDataCenter finalCentroids =
 loop.closeWith(newCentroids,
 newCentroids.join(loop).where(*).equalTo(*).filter(new MyFilter()));
 DataSetTuple2Integer, GeoTimeDataTupel clusteredPoints =
 points
 // assign points to final clusters
 .map(new
 SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, centroids);
 // emit result
 clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
 finalCentroids.writeAsText(outputPath+/centers);//print();
 // execute program
 try {
 env.execute(KMeans Flink);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }

 public static final class MyFilter implements
 FilterFunctionTuple2GeoTimeDataCenter, GeoTimeDataCenter {

 private static final long serialVersionUID =
 5868635346889117617L;

 public boolean filter(Tuple2GeoTimeDataCenter,
 GeoTimeDataCenter tuple) throws Exception {
 if(tuple.f0.equals(tuple.f1)) {
 return true;
 }
 else {
 return false;
 }
 }
 }





Re: Scala: registerAggregationConvergenceCriterion

2015-07-17 Thread Till Rohrmann
Hi Max,

I’d recommend you to use the DataSet[T].iterateWithTermination method
instead. It has the following syntax: iterationWithTermination(maxIterations:
Int)(stepFunction: (DataSet[T] = (DataSet[T], DataSet[_])): DataSet[T]

There you see that your step function has to return a tuple of data sets.
The first tuple value is the result for the next iteration. The second data
set defines the convergence criterion. If the DataSet is empty, then the
iteration will be terminated. If not and if the maximum number of
iterations has not been exceeded, then the next iteration is started.

Cheers,
Till
​

On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 I try to use BulkIterations with a convergence criterion. Unfortunately,
 I'm not sure how to use them and I couldn't find a nice example.

 Here are two code snippets and the resulting error, maybe someone can help.
 I'm working on the current branch.

 Example1:

   if(true){
 val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))

 val agg = new LongSumAggregator;

 val ds2 = ds.iterate(10)({
   x =

   x map { y = y*2 }
 }).registerAggregator(test, agg)
 println(ds2)
 //.registerAggregationConvergenceCriterion(test, agg, new
 LongZeroConvergence)

 println(ds2.collect)
   }

 Error:

 Exception in thread main java.lang.UnsupportedOperationException:
 Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255
 cannot have aggregators.
 at
 org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194)
 at Test$delayedInit$body.apply(test.scala:386)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at Test$.main(test.scala:47)
 at Test.main(test.scala)
 :run FAILED



 Example 2:


   if(true){
 val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))

 val agg = new LongSumAggregator;

 val ds2 = ds.iterate(10)({
   x =

   x map { y = y*2 }
 }).registerAggregator(test,
 agg).registerAggregationConvergenceCriterion(test, agg, new
 LongZeroConvergence)

 println(ds2.collect)
   }


 Error:

 :compileScala
 [ant:scalac]
 /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386:
 error: value registerAggregationConvergenceCriterion is not a member of
 org.apache.flink.api.scala.DataSet[Int]
 [ant:scalac] }).registerAggregator(test,
 agg).registerAggregationConvergenceCriterion(test, agg, new
 LongZeroConvergence)
 [ant:scalac]^
 [ant:scalac] one error found
 :compileScala FAILED



 Thanks!

 Cheers,
 Max



Re: UI for flink

2015-07-13 Thread Till Rohrmann
Hi Hermann,

when you start a Flink cluster, then also the web interface is started. It
is reachable under http://jobManagerURL:8081. The web interface tells you
a lot about the current state of your cluster and the currently executed
Flink jobs.

Additionally, you can start the web client via ./start-webclient.sh, which
you can find in the bin directory. The web client, which is reachable under
port 8080, allows you to submit Flink jobs to your cluster via a browser.

Cheers,
Till
​

On Mon, Jul 13, 2015 at 2:07 PM, Hermann Azong hermann.az...@gmail.com
wrote:

 Hello Flinkers,

 I'm wondering if a UI Solution for Flink already exist when starting

 Sincerly,

 Hermann



Re: flink on yarn configuration

2015-07-14 Thread Till Rohrmann
Hi Paul,

when you run your Flink cluster with YARN then we cannot give the full
amount of the allocated container memory to Flink. The reason is that YARN
itself needs some of the memory as well. Since YARN is quite strict with
containers which exceed their memory limit (the container is instantly
killed), we assign per default 0.25 of the container’s memory to YARN.

I cannot tell why in your case it is 0.5. Maybe you’re using an old version
of Flink. But you can control the memory fraction which is given to Yarn
using the configuration parameter yarn.heap-cutoff-ratio.

Cheers,
Till
​

On Tue, Jul 14, 2015 at 10:47 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hello community,

 i want run my flink app on a cluster (cloudera 5.4.4) with 3 nodes (one pc
 has i7 8core with 16GB RAM). now i want submit my spark job on yarn (20GB
 RAM).

 my script to deploy the flink cluster on yarn:

 export HADOOP_CONF_DIR=/etc/hadoop/conf/
 ./flink-0.9.0/bin/yarn-session.sh -n 1 -jm 10240 -tm 10240

 my script to submit the job is to time the following:

 ./flink-0.9.0/bin/flink run /home/marcel/Desktop/ma-flink.jar

 in the flink dashbord shown are only 5GB memory used for computed my job?

 maybe my configuration is not the optimal??

 best regards,
 paul



Re: Submitting jobs from within Scala code

2015-07-16 Thread Till Rohrmann
Hi Philipp,

could you post the complete log output. This might help to get to the
bottom of the problem.

Cheers,
Till

On Thu, Jul 16, 2015 at 11:01 AM, Philipp Goetze 
philipp.goe...@tu-ilmenau.de wrote:

  Hi community,

 in our project we try to submit built Flink programs to the jobmanager
 from within Scala code. The test program is executed correctly when
 submitted via the wrapper script bin/flink run ... and also with the
 webclient. But when executed from within the Scala code nothing seems to
 happen, but the following warning is found in the log:

 10:47:18,153 WARN  akka.remote.ReliableDeliverySupervisor 
- Association with remote system [akka.tcp://flink@127.0.0.1:34074] has 
 failed, address is now gated for [5000] ms. Reason is: 
 [org.apache.flink.runtime.jobgraph.AbstractJobVertex]


 Our submit method looks like that:

  def submitJar(master: String, path: String, className: String, args: 
 String*) = {

 val file = new File(path)

 val parallelism = 1

 val wait = true

 try {

   val program = new PackagedProgram(file, className, args:_*)

   val jobManagerAddress = getInetFromHostport(master)

   val client = new Client(jobManagerAddress, new Configuration(), 
 program.getUserCodeClassLoader(), 1)

   println(Executing  + path);

   client.run(program, parallelism, wait);

 } catch {

   case e: ProgramInvocationException = e.printStackTrace()

 }

   }


 I took this as a reference:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-submit-flink-jars-from-plain-Java-programs-td656.html

 I hope you can help.

 Best Regards,
 Philipp Götze



Re: Submitting jobs from within Scala code

2015-07-16 Thread Till Rohrmann
)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 Best Regards,
 Philipp



 On 16.07.2015 11:45, Till Rohrmann wrote:

  When you run your program from the IDE, then you can specify a
 log4j.properties file. There you can configure where and what to log. It
 should be enough to place the log4j.properties file in the resource
 folder of your project. An example properties file could look like:

 log4j.rootLogger=INFO, testlogger

 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 Alternatively, you can specify it via a JVM option: 
 -Dlog4j.configuration=path
 to properties file

 Cheers,
 Till
 ​

 On Thu, Jul 16, 2015 at 11:23 AM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:

  Hi Till,

 the problem is that this is the only output :( Or is it possible to get a
 more verbose log output?

 Maybe it is important to note, that both Flink and our project is built
 with Scala 2.11.

 Best Regards,
 Philipp


 On 16.07.2015 11:12, Till Rohrmann wrote:

 Hi Philipp,

  could you post the complete log output. This might help to get to the
 bottom of the problem.

  Cheers,
 Till

 On Thu, Jul 16, 2015 at 11:01 AM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:

  Hi community,

 in our project we try to submit built Flink programs to the jobmanager
 from within Scala code. The test program is executed correctly when
 submitted via the wrapper script bin/flink run ... and also with the
 webclient. But when executed from within the Scala code nothing seems to
 happen, but the following warning is found in the log:

 10:47:18,153 WARN  akka.remote.ReliableDeliverySupervisor   
  - Association with remote system [akka.tcp://flink@127.0.0.1:34074] 
 has failed, address is now gated for [5000] ms. Reason is: 
 [org.apache.flink.runtime.jobgraph.AbstractJobVertex]


 Our submit method looks like that:

  def submitJar(master: String, path: String, className: String, args: 
 String*) = {

 val file = new File(path)

 val parallelism = 1

 val wait = true

 try {

   val program = new PackagedProgram(file, className, args:_*)

   val jobManagerAddress = getInetFromHostport(master)

   val client = new Client(jobManagerAddress, new Configuration(), 
 program.getUserCodeClassLoader(), 1)

   println(Executing  + path);

   client.run(program, parallelism, wait);

 } catch {

   case e: ProgramInvocationException = e.printStackTrace()

 }

   }


 I took this as a reference:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-submit-flink-jars-from-plain-Java-programs-td656.html

 I hope you can help.

 Best Regards,
 Philipp Götze








Re: Submitting jobs from within Scala code

2015-07-16 Thread Till Rohrmann
Good to hear that your problem is solved :-)

Cheers,
Till

On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze 
philipp.goe...@tu-ilmenau.de wrote:

  Hi Till,

 many thanks for your effort. I finally got it working.

 I'm a bit embarrassed because the issue was solved by using the same
 flink-dist-JAR from the locally running Flink version. So to say I used an
 older Snapshot version for compiling than for running :-[

 Best Regards,
 Philipp


 On 16.07.2015 17:35, Till Rohrmann wrote:

  Hi Philipp,

 what I usually do to run a Flink program on a cluster from within my IDE,
 I create a RemoteExecutionEnvironment. Since I have one UDF (the map
 function which doubles the values) defined, I also need to specify the jar
 containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

 def main(args: Array[String]) {
 // set up the execution environment
 val env = ExecutionEnvironment.createRemoteEnvironment(localhost, 6123, 
 target/test-1.0-SNAPSHOT.jar)

 val elements = env.fromElements(1,2,3,4,5)

 val doubled = elements.map(x = 2*x)

 doubled.printOnTaskManager(TaskManager)

 // execute program
 env.execute(Flink Scala API Skeleton)
   }

 But I also tried your approach of how to submit jobs to Flink and it
 worked for me as well. Therefore, I guess that there is something wrong
 with your job. What happens in PigStorage().load?

 Cheers,
 Till
 ​

 On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:

  Hey Tim,

 I think my previous mail was intercepted or something similar. However
 you can find my reply below. I already tried a simpler job which just does
 a env.fromElements... but still the same stack.

 How do you normally submit jobs (jars) from within the code?

 Best Regards,
 Philipp


  Forwarded Message   Subject: Re: Submitting jobs from
 within Scala code  Date: Thu, 16 Jul 2015 14:31:01 +0200  From: Philipp
 Goetze philipp.goe...@tu-ilmenau.de philipp.goe...@tu-ilmenau.de  To:
 user@flink.apache.org

 Hey,

 from the JobManager I do not get any more hints:

 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist   
 - Received message RequestJobCounts at akka://flink/user/archive from 
 Actor[akka://flink/temp/$gc].
 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist   
 - Handled message RequestJobCounts in 0 ms from 
 Actor[akka://flink/temp/$gc].
 13:36:06,674 DEBUG org.eclipse.jetty.util.log
 - RESPONSE /jobsInfo  200
 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager
 - Received message RequestBlobManagerPort at 
 akka://flink/user/jobmanager from 
 Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager
 - Handled message RequestBlobManagerPort in 0 ms from 
 Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection
 - Received PUT request for content addressable BLOB
 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager
 - Received message 
 SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at 
 akka://flink/user/jobmanager from 
 Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
 13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager
 - Running initialization on master for job 
 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager
 - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
 org.apache.flink.runtime.client.JobSubmissionException: The vertex null 
 (null) has no invokable class.
  at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
  at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
  at scala.collection.Iterator$class.foreach(Iterator.scala:743)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at 
 org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
  at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
  at 
 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
  at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply

Re: Flink Kafka example in Scala

2015-07-17 Thread Till Rohrmann
These two links [1, 2] might help to get your job running. The first link
describes how to set up a job using Flink's machine learning library, but
it works also for the flink-connector-kafka library.

Cheers,
Till

[1] http://stackoverflow.com/a/31455068/4815083
[2]
https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution

On Fri, Jul 17, 2015 at 8:42 AM, Aljoscha Krettek aljos...@apache.org
wrote:

 Hi, this looks like the flink-connector-kafka jar is not available where
 the job is running? Did you put it in the library folder of flink on all
 the machines or did you submit it with the job?

 On Thu, Jul 16, 2015, 21:05 Wendong wendong@gmail.com wrote:

 Hi Gyula,

 Cool. I removed .print and the error was gone.

 However, env.execute failed with errors:

 .
 Caused by: java.lang.Exception: Call to registerInputOutput() of invokable
 failed
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
 ...
 Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
 Cannot instantiate user function.
 at

 org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
 ...
 Caused by: java.lang.ClassNotFoundException:
 org.apache.flink.streaming.connectors.kafka.api.KafkaSink
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)


 In the following code:

 val stream = env
   .addSource(new KafkaSource[String](localhost:2181, test, new
 SimpleStringSchema))
   .addSink(new KafkaSink[String](localhost:2181, test, new
 SimpleStringSchema))

 Anything wrong? I already did  import
 org.apache.flink.streaming.connectors.kafka.api._. Class
 SimpleStringSchema
 was modified (see previous post).

 Thanks,

 Wendong



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html
 Sent from the Apache Flink User Mailing List archive. mailing list
 archive at Nabble.com.




Re: Submitting jobs from within Scala code

2015-07-17 Thread Till Rohrmann
This should be rather easy to add with the latest addition of the
ActorGateway and the message decoration.
​

On Fri, Jul 17, 2015 at 5:04 PM, Stephan Ewen se...@apache.org wrote:

 Seems that version mismatches are one of the most common sources of
 issues...

 Maybe we should think about putting a version number into the messages (at
 least between client and JobManager) and fail fast on version mismatches...

 On Thu, Jul 16, 2015 at 5:56 PM, Till Rohrmann trohrm...@apache.org
 wrote:

 Good to hear that your problem is solved :-)

 Cheers,
 Till

 On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:

  Hi Till,

 many thanks for your effort. I finally got it working.

 I'm a bit embarrassed because the issue was solved by using the same
 flink-dist-JAR from the locally running Flink version. So to say I used an
 older Snapshot version for compiling than for running :-[

 Best Regards,
 Philipp


 On 16.07.2015 17:35, Till Rohrmann wrote:

  Hi Philipp,

 what I usually do to run a Flink program on a cluster from within my
 IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the
 map function which doubles the values) defined, I also need to specify the
 jar containing this class. In my case, the jar is called
 test-1.0-SNAPSHOT.jar.

 def main(args: Array[String]) {
 // set up the execution environment
 val env = ExecutionEnvironment.createRemoteEnvironment(localhost, 
 6123, target/test-1.0-SNAPSHOT.jar)

 val elements = env.fromElements(1,2,3,4,5)

 val doubled = elements.map(x = 2*x)

 doubled.printOnTaskManager(TaskManager)

 // execute program
 env.execute(Flink Scala API Skeleton)
   }

 But I also tried your approach of how to submit jobs to Flink and it
 worked for me as well. Therefore, I guess that there is something wrong
 with your job. What happens in PigStorage().load?

 Cheers,
 Till
 ​

 On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:

  Hey Tim,

 I think my previous mail was intercepted or something similar. However
 you can find my reply below. I already tried a simpler job which just does
 a env.fromElements... but still the same stack.

 How do you normally submit jobs (jars) from within the code?

 Best Regards,
 Philipp


  Forwarded Message   Subject: Re: Submitting jobs from
 within Scala code  Date: Thu, 16 Jul 2015 14:31:01 +0200  From: Philipp
 Goetze philipp.goe...@tu-ilmenau.de philipp.goe...@tu-ilmenau.de  To:
 user@flink.apache.org

 Hey,

 from the JobManager I do not get any more hints:

 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist 
   - Received message RequestJobCounts at akka://flink/user/archive 
 from Actor[akka://flink/temp/$gc].
 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist 
   - Handled message RequestJobCounts in 0 ms from 
 Actor[akka://flink/temp/$gc].
 13:36:06,674 DEBUG org.eclipse.jetty.util.log  
   - RESPONSE /jobsInfo  200
 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager  
   - Received message RequestBlobManagerPort at 
 akka://flink/user/jobmanager from 
 Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager  
   - Handled message RequestBlobManagerPort in 0 ms from 
 Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection  
   - Received PUT request for content addressable BLOB
 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager  
   - Received message 
 SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at 
 akka://flink/user/jobmanager from 
 Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
 13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager  
   - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager  
   - Running initialization on master for job 
 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager  
   - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting 
 Query)
 org.apache.flink.runtime.client.JobSubmissionException: The vertex null 
 (null) has no invokable class.
at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
at scala.collection.Iterator$class.foreach(Iterator.scala:743)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72

Re: Flink deadLetters

2015-07-17 Thread Till Rohrmann
That is usually nothing to worry about. This just means that the message
was sent without specifying a sender. What Akka then does is to use the
`/deadLetters` actor as the sender.

What kind of job is it?

Cheers,
Till

On Fri, Jul 17, 2015 at 6:30 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Hi to all,

 my job seems to be stucked and there's nothing logged also in debug mode.
 The only strange thing is a

 Received message SendHeartbeat at akka://flink/user/taskmanager_1 from
 Actor[akka://flink/deadLetters].

 Could it be a symptom of a problem?

 Best,
 Flavio



Re: Flink Kafka example in Scala

2015-07-20 Thread Till Rohrmann
Hi Wendong,

why do you exclude the kafka dependency from the `flink-connector-kafka`?
Do you want to use your own kafka version?

I'd recommend you to build a fat jar instead of trying to put the right
dependencies in `/lib`. Here [1] you can see how to build a fat jar with
sbt.

Cheers,
Till

[1]
http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea

On Sat, Jul 18, 2015 at 12:40 AM, Wendong wendong@gmail.com wrote:

 Hi Till,

 Thanks for the information. I'm using sbt and I have the following line in
 build.sbt:

 libraryDependencies += org.apache.flink % flink-connector-kafka %
 0.9.0 exclude(org.apache.kafka, kafka_${scala.binary.version})

 Also, I copied flink-connector-kafka-0.9.0.jar under flink_root_dir/lib/,
 but there is still ClassNotFoundException for KafkaSink.

 I appreciate it if you have any suggestion.

 Wendong



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2144.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: Too few memory segments provided exception

2015-07-20 Thread Till Rohrmann
The taskmanager.memory.fraction you can also set from within the IDE by
giving the corresponding configuration object to the LocalEnvironment using
the setConfiguration method. However, the taskmanager.heap.mb is basically
the -Xmx value with which you start your JVM. Usually, you can set this in
your program run settings.

Cheers,
Till
​

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels m...@apache.org wrote:

 Hi Shivani,

 Flink doesn't have enough memory to perform a hash join. You need to
 provide Flink with more memory. You can either increase the
 taskmanager.heap.mb config variable or set taskmanager.memory.fraction
 to some value greater than 0.7 and smaller then 1.0. The first config
 variable allocates more overall memory for Flink; the latter changes the
 ratio between Flink managed memory (e.g. for hash join) and user memory
 (for you functions and Gelly's code).

 If you run this inside an IDE, the memory is configured automatically and
 you don't have control over that at the moment. You could, however, start a
 local cluster (./bin/start-local) after you adjusted your flink-conf.yaml
 and run your programs against that configured cluster. You can do that
 either through your IDE using a RemoteEnvironment or by submitting the
 packaged JAR to the local cluster using the command-line tool (./bin/flink).

 Hope that helps.

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge shgha...@gmail.com
 wrote:

 Hello,
  I am working on a problem which implements Adamic Adar Algorithm using
 Gelly.
 I am running into this exception for all the Joins (including the one
 that are part of the reduceOnNeighbors function)

 Too few memory segments provided. Hash Join needs at least 33 memory
 segments.


 The problem persists even when I comment out some of the joins.

 Even after using edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 as suggested by @AndraLungu the problem persists.

 The code is


 DataSetTuple2Long, Long degrees = graph.getDegrees();

 //get neighbors of each vertex in the HashSet for it's value
 computedNeighbors = graph.reduceOnNeighbors(new
 GatherNeighbors(), EdgeDirection.ALL);

 //get vertices with updated values for the final Graph which will
 be used to get Adamic Edges
 Vertices = computedNeighbors.join(degrees,
 JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new
 JoinNeighborDegrees());

 GraphLong, Tuple3Double, HashSetLong, ListTuple3Long, Long,
 Double, Double updatedGraph =
 Graph.fromDataSet(Vertices, edges, env);

 //configure Vertex Centric Iteration
 VertexCentricConfiguration parameters = new
 VertexCentricConfiguration();

 parameters.setName(Find Adamic Adar Edge Weights);

 parameters.setDirection(EdgeDirection.ALL);

 //run Vertex Centric Iteration to get the Adamic Adar Edges into
 the vertex Value
 updatedGraph = updatedGraph.runVertexCentricIteration(new
 GetAdamicAdarEdgesLong(), new NeighborsMessengerLong(), 1, parameters);

 //Extract Vertices of the updated graph
 DataSetVertexLong, Tuple3Double, HashSetLong,
 ListTuple3Long, Long, Double vertices = updatedGraph.getVertices();

 //Extract the list of Edges from the vertex values
 DataSetTuple3Long, Long, Double edg = vertices.flatMap(new
 GetAdamicList());

 //Partial weights for the edges are added
 edg = edg.groupBy(0,1).reduce(new AdamGroup());

 //Graph is updated with the Adamic Adar Edges
 edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 Any idea how I could tackle this Exception?





Re: Flink Kafka example in Scala

2015-07-21 Thread Till Rohrmann
Glad to hear that it finally worked :-)

On Tue, Jul 21, 2015 at 2:21 AM, Wendong wendong@gmail.com wrote:

 Hi Till,

 Thanks for your suggestion! I did a fat jar and the runtime error of
 ClassNotFoundException was finally gone. I wish I had tried fat jar earlier
 and it would have saved me 4 days.

 Wendong



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2198.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: Flink Data Stream Union

2015-10-21 Thread Till Rohrmann
Can it be that you forgot to call unionMessageStreams in your main method?

Cheers,
Till
​

On Wed, Oct 21, 2015 at 3:02 PM, flinkuser  wrote:

> Here is the strange behavior.
>
> Below code works in one box but not in the other. I had it working in my
> laptop the whole of yesterday, but strangely today it doesnt work in my
> desktop.
>
> Can anyone please let me know what the issue is.
>
>
> public static void main(String[] args) throws Exception {
> try {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream msgDataStream1 =
> env.addSource((new
> SocketSource(hostName1, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
> DataStream msgDataStream2 =
> env.addSource((new
> SocketSource(hostName2, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
>
> env.execute("Stock stream");
>
> } catch (Exception e) {
> System.err.println("Exception  = > " +
> e.getMessage());
> e.printStackTrace();
> }
> }
>
> private static void unionMessageStreams(DataStream
> msgDataStream1,
> DataStream msgDataStream2) {
> try {
>
> DataStream ds =
> msgDataStream1.union(msgDataStream2);
> ds.print();
> } catch (Exception e) {
> System.err.println("Exception in union Message
> Streams () = > " +
> e.getMessage());
> }
> }
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Zeppelin Integration

2015-10-21 Thread Till Rohrmann
Hi Trevor,

in order to use Zeppelin with a different Flink version in local mode,
meaning that Zeppelin starts a LocalFlinkMiniCluster when executing your
jobs, you have to build Zeppelin and change the flink.version property in
the zeppelin/flink/pom.xml file to the version you want to use.

If you want to let Zeppelin submit jobs to a remote cluster, you should
build Zeppelin with the version of your cluster. That’s because internally
Zeppelin will use this version to construct a JobGraph which is then
submitted to the cluster. In order to configure the remote cluster, you
have to go the *Interpreter* page and scroll down to the *flink* section.
There you have to specify the address of your cluster under *host* and the
port under *port*. This should then be used to submit jobs to the Flink
cluster.

I hope this answers your question.

Btw: If you want to use Zeppelin with the latest Flink 0.10-SNAPSHOT
version, you should checkout my branch
https://github.com/tillrohrmann/incubator-zeppelin/tree/flink-0.10-SNAPSHOT
where I’ve made the necessary changes.

Cheers,
Till
​

On Wed, Oct 21, 2015 at 5:00 PM, Trevor Grant 
wrote:

> I'm setting up some Flink/Spark/Zeppelin at work.  Spark+Zeppelin seems to
> be relatively well supported and configurable but the Flink is not so much.
>
> I want Zeppelin to run against my 0.10 build instead of the 0.6 build that
> ships with Zeppelin.  My best guess at the moment on how to accomplish this
> is to create a symbolic link from the /opt/zepplin/flink folder to
> /opt/flink-0.10, but this feels dirty and wrong.
>
> Does anyone out there have any experience connecting Zeppelin to a
> non-prepackaged Flink build?
>
> I feel like there is a great opporutnity for a HOWTO write up if non
> currently exists.
>
> I'm asking on the Zeppelin user mailing list too as soon as I am added.
>
> Thanks for any help
>
> tg
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Till Rohrmann
I fear that the filter operations are not chained because there are at
least two of them which have the same DataSet as input. However, it's true
that the intermediate results are not materialized.

It is also correct that the filter operators are deployed colocated to the
data sources. Thus, there is no network traffic. However, the data will
still be serialized/deserialized between the not-chained operators (also if
they reside on the same machine).



On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay  wrote:

> Hello!
>
> > I have thought about a workaround where the InputFormat would return
> > Tuple2s and the first field is the name of the dataset to which a record
> > belongs. This would however require me to filter the read data once for
> > each dataset or to do a groupReduce which is some overhead i'm
> > looking to prevent.
>
> I think that those two filters might not have that much overhead,
> because of several optimizations Flink does under the hood:
> - The dataset of Tuple2s won't be materialized, but instead will be
> streamed directly to the two filter operators.
> - The input format and the two filters will probably end up on the
> same machine, because of chaining, so there won't be
> serialization/deserialization between them.
>
> Best,
> Gabor
>
>
>
> 2015-10-22 11:38 GMT+02:00 Pieter Hameete :
> > Good morning!
> >
> > I have the following usecase:
> >
> > My program reads nested data (in this specific case XML) based on
> > projections (path expressions) of this data. Often multiple paths are
> > projected onto the same input. I would like each path to result in its
> own
> > dataset.
> >
> > Is it possible to generate more than 1 dataset using a readFile
> operation to
> > prevent reading the input twice?
> >
> > I have thought about a workaround where the InputFormat would return
> Tuple2s
> > and the first field is the name of the dataset to which a record belongs.
> > This would however require me to filter the read data once for each
> dataset
> > or to do a groupReduce which is some overhead i'm looking to prevent.
> >
> > Is there a better (less overhead) workaround for doing this? Or is there
> > some mechanism in Flink that would allow me to do this?
> >
> > Cheers!
> >
> > - Pieter
>


Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Till Rohrmann
If not, could you provide us with the program and test data to reproduce
the error?

Cheers,
Till

On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
wrote:

> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
> > On 22 Oct 2015, at 12:31, Stephan Ewen  wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2 ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction() {
> >
> > @Override
> >
> >public void reduce(Iterable< Tuple2>
> values,  Collector out) throws Exception {
> >
> >  // Issue : all values do not share the same key
> >
> >   final List> listValues = new
> ArrayList>();
> >
> >  for (final Tuple2value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >for (int i = 1; i < listValues.size(); i++) {
> >
> > if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >   throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> > è This exception is fired !
> >
> >}
> >
> > }
> >
> > }
> >
> > }) ;
> >
> >
> >
> > I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
> >
> >
> >
> > The sortGroup is not the cause of the problem, as I do get the same
> error without it.
> >
> >
> >
> > Have I misunderstood the grouping concept or is it really an awful bug?
> >
> >
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
> >
>
>


Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Till Rohrmann
You don’t modify the objects, however, the ReusingKeyGroupedIterator, which
is the iterator you have in your reduce function, does. Internally it uses
two objects, in your case of type Tuple2<InputRecord, Reference>, to
deserialize the input records. These two objects are alternately returned
when you call next on the iterator. Since you only store references to
these two objects in your ArrayList, you will see any changes made to these
two objects.

However, this only explains why the values of your elements change and not
the key. To understand why you observe different keys in your group you
have to know that the ReusingKeyGroupedIterator does a look ahead to see
whether the next element has the same key value. The look ahead is stored
in one of the two objects. When the iterator detects that the next element
has a new key, then it will finish the iterator. However, you’ll will see
the key value of the next group in half of your elements.

If you want to accumulate input data while using reuse object mode you
should copy the input elements.
​

On Thu, Oct 22, 2015 at 1:30 PM, LINZ, Arnaud <al...@bouyguestelecom.fr>
wrote:

> Hi,
>
>
>
> I was using primitive types, and EnableObjectReuse was turned on.  My next
> move was to turn it off, and it did solved the problem.
>
> It also increased execution time by 10%, but it’s hard to say if this
> overhead is due to the copy or to the change of behavior of the reduceGroup
> algorithm once it get the right data.
>
>
>
> Since I never modify my objects, why object reuse isn’t working ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> *De :* Till Rohrmann [mailto:trohrm...@apache.org]
> *Envoyé :* jeudi 22 octobre 2015 12:36
> *À :* user@flink.apache.org
> *Objet :* Re: Multiple keys in reduceGroup ?
>
>
>
> If not, could you provide us with the program and test data to reproduce
> the error?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
>
> > On 22 Oct 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud <al...@bouyguestelecom.fr>
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2<InputRecord, Reference> ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction<InputRecord, OutputRecord>() {
> >
> > @Override
> >
> >public void reduce(Iterable< Tuple2<InputRecord, Reference>>
> values,  Collector out) throws Exception {
> >
> >  // Issue : all values do not share the same key
> >
> >   final List<Tuple2<InputRecord, Reference>> listValues = new
> ArrayList<Tuple2<InputRecord, Reference>>();
> >
> >  for (final Tuple2<InputRecord, Reference>value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >for (int i = 1; i < listValues.size(); i++) {
> >
> > if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >   throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> > è This exception is fired !
> >
> >}
> >

Re: Zeppelin Integration

2015-11-04 Thread Till Rohrmann
Really cool tutorial Trevor :-)

On Wed, Nov 4, 2015 at 3:26 PM, Robert Metzger <rmetz...@apache.org> wrote:

> For those interested, Trevor wrote a blog post describing how to setup
> Spark, Flink and Zeppelin, both locally and on clusters:
> http://trevorgrant.org/2015/11/03/apache-casserole-a-delicious-big-data-recipe-for-the-whole-family/
> Thanks Trevor for the great tutorial!
>
> On Thu, Oct 22, 2015 at 4:23 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Trevor,
>>
>> that’s actually my bad since I only tested my branch against a remote
>> cluster. I fixed the problem (not properly starting the
>> LocalFlinkMiniCluster) so that you can now use Zeppelin also in local
>> mode. Just check out my branch again.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Oct 21, 2015 at 10:00 PM, Trevor Grant <trevor.d.gr...@gmail.com>
>> wrote:
>>
>>> Hey Till,
>>>
>>> I cloned your branch of Zeplin and while it will compile, it fails tests
>>> on timeout, which consequently was the same issue I was having when trying
>>> to use Zeppelin.
>>>
>>> Ideas?
>>>
>>>
>>> ---
>>> Test set: org.apache.zeppelin.flink.FlinkInterpreterTest
>>>
>>> ---
>>> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 100.347
>>> sec <<< FAILURE! - in org.apache.zeppelin.flink.FlinkInterpreterTest
>>> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347
>>> sec  <<< ERROR!
>>> java.util.concurrent.TimeoutException: Futures timed out after [10
>>> milliseconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>> at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>> at
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.getLeaderIndex(FlinkMiniCluster.scala:171)
>>> at
>>> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.getLeaderRPCPort(LocalFlinkMiniCluster.scala:132)
>>> at
>>> org.apache.zeppelin.flink.FlinkInterpreter.getPort(FlinkInterpreter.java:136)
>>> at
>>> org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:98)
>>> at
>>> org.apache.zeppelin.flink.FlinkInterpreterTest.setUp(FlinkInterpreterTest.java:42)
>>>
>>> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347
>>> sec  <<< ERROR!
>>> java.lang.NullPointerException: null
>>> at
>>> org.apache.zeppelin.flink.FlinkInterpreter.close(FlinkInterpreter.java:221)
>>> at
>>> org.apache.zeppelin.flink.FlinkInterpreterTest.tearDown(FlinkInterpreterTest.java:48)
>>>
>>>
>>>
>>> Trevor Grant
>>> Data Scientist
>>> https://github.com/rawkintrevo
>>> http://stackexchange.com/users/3002022/rawkintrevo
>>>
>>> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>>>
>>>
>>> On Wed, Oct 21, 2015 at 11:57 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Trevor,
>>>>
>>>> in order to use Zeppelin with a different Flink version in local mode,
>>>> meaning that Zeppelin starts a LocalFlinkMiniCluster when executing
>>>> your jobs, you have to build Zeppelin and change the flink.version
>>>> property in the zeppelin/flink/pom.xml file to the version you want to
>>>> use.
>>>>
>>>> If you want to let Zeppelin submit jobs to a remote cluster, you should
>>>> build Zeppelin with the version of your cluster. That’s because internally
>>>> Zeppelin will use this version to construct a JobGraph which is then
>>>> submitted to the cluster. In order to configure the remote cluster, you
>>>> have to go the *Interpreter* page and scroll down to the *flink*
>>>> section. There you have to specify the address of your cluster under
>>>> *host* and the port under *port*. This should then be used to submit
>>>> jobs to the Flink cluster.
>>>>
>>>> I hope this answers your question.
>>

Re: Published test artifacts for flink streaming

2015-11-06 Thread Till Rohrmann
No that is not possible since you cannot access DataSets from inside UDFs.
And select and where operations are translated into a filter operation on a
DataSet.
​

On Fri, Nov 6, 2015 at 6:03 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Promising observation, Till. Is it possible to access Table API's
> select and where operators from within such a flatMap?
>
> -n
>
> On Fri, Nov 6, 2015 at 6:19 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> > Hi Nick,
> >
> > I think a flatMap operation which is instantiated with your list of
> > predicates should do the job. Thus, there shouldn’t be a need to dig
> deeper
> > than the DataStream for the first version.
> >
> > Cheers,
> > Till
> >
> >
> > On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk <ndimi...@gmail.com> wrote:
> >>
> >> Thanks Stephan, I'll check that out in the morning. Generally speaking,
> it
> >> would be great to have some single-jvm example tests for those of us
> getting
> >> started. Following the example of WindowingIntegrationTest is mostly
> >> working, though reusing my single sink instance with its static
> collection
> >> results in non-deterministic results; there appears to be a race between
> >> instances clearing the collection in their open method and the runtime
> >> returning the collection to my test harness.
> >>
> >> I'd also appreciate some guidance on stream composition. It's nice to
> use
> >> the fluent API when exploring data in a shell, but it seems to me like
> that
> >> API is cumbersome when composing data pipelines of reusable partials. Or
> >> maybe I'm doing it all wrong... Hence the request for more examples :)
> >>
> >> While I'm asking, how might you model this: I have a set of predicates
> I'd
> >> like to flatMap over a stream. An input item should be compared vs every
> >> predicate (basically, I want a Clojure juxt of predicates over each
> stream
> >> element). Imagine those predicates expressed as where clauses via the
> Table
> >> API. Say I have hundreds of thousands of these predicates to run over
> every
> >> stream event. Is the java client API rich enough to express such a
> flow, or
> >> should I examine something lower than DataStream?
> >>
> >> Thanks a lot, and sorry for all the newb questions.
> >> -n
> >>
> >>
> >> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote:
> >>>
> >>> Hey!
> >>>
> >>> There is also a collect() sink in the "flink-streaming-contrib"
> project,
> >>> see here:
> >>>
> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
> >>>
> >>> It should work well locally for testing. In that case you can write a
> >>> program as usual an use "DataStreamUtils.collect(stream)", so you need
> to
> >>> stop reading it once you know the stream is exhausted...
> >>>
> >>> Stephan
> >>>
> >>>
> >>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com>
> wrote:
> >>>>
> >>>> Hi Robert,
> >>>>
> >>>> It seems "type" was what I needed. This it also looks like the test
> >>>> jar has an undeclared dependency. In the end, the following allowed me
> >>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
> >>>>
> >>>> -n
> >>>>
> >>>> 
> >>>>   org.apache.flink
> >>>>   flink-streaming-core
> >>>>   ${flink.version}
> >>>>   test-jar
> >>>>   test
> >>>> 
> >>>> 
> >>>>   org.apache.flink
> >>>>   flink-test-utils
> >>>>   ${flink.version}
> >>>>   test
> >>>> 
> >>>>
> >>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org>
> >>>> wrote:
> >>>> > Hi Nick,
> >>>> >
> >>>> > we are usually publishing the test  artifacts. Can you try and
> replace
> >>>> > the
> >>>> >  tag by test-jar:
> >>>> >
> >>>> > 
> >>>> >org.apache.flink
> >>>&

Re: Published test artifacts for flink streaming

2015-11-06 Thread Till Rohrmann
Hi Nick,

I think a flatMap operation which is instantiated with your list of
predicates should do the job. Thus, there shouldn’t be a need to dig deeper
than the DataStream for the first version.

Cheers,
Till
​

On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk  wrote:

> Thanks Stephan, I'll check that out in the morning. Generally speaking, it
> would be great to have some single-jvm example tests for those of us
> getting started. Following the example of WindowingIntegrationTest is
> mostly working, though reusing my single sink instance with its static
> collection results in non-deterministic results; there appears to be a race
> between instances clearing the collection in their open method and the
> runtime returning the collection to my test harness.
>
> I'd also appreciate some guidance on stream composition. It's nice to use
> the fluent API when exploring data in a shell, but it seems to me like that
> API is cumbersome when composing data pipelines of reusable partials. Or
> maybe I'm doing it all wrong... Hence the request for more examples :)
>
> While I'm asking, how might you model this: I have a set of predicates I'd
> like to flatMap over a stream. An input item should be compared vs every
> predicate (basically, I want a Clojure juxt of predicates over each stream
> element). Imagine those predicates expressed as where clauses via the Table
> API. Say I have hundreds of thousands of these predicates to run over every
> stream event. Is the java client API rich enough to express such a flow, or
> should I examine something lower than DataStream?
>
> Thanks a lot, and sorry for all the newb questions.
> -n
>
>
> On Thursday, November 5, 2015, Stephan Ewen  wrote:
>
>> Hey!
>>
>> There is also a collect() sink in the "flink-streaming-contrib" project,
>> see here:
>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>
>> It should work well locally for testing. In that case you can write a
>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>> stop reading it once you know the stream is exhausted...
>>
>> Stephan
>>
>>
>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk  wrote:
>>
>>> Hi Robert,
>>>
>>> It seems "type" was what I needed. This it also looks like the test
>>> jar has an undeclared dependency. In the end, the following allowed me
>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>
>>> -n
>>>
>>> 
>>>   org.apache.flink
>>>   flink-streaming-core
>>>   ${flink.version}
>>>   test-jar
>>>   test
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-test-utils
>>>   ${flink.version}
>>>   test
>>> 
>>>
>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger 
>>> wrote:
>>> > Hi Nick,
>>> >
>>> > we are usually publishing the test  artifacts. Can you try and replace
>>> the
>>> >  tag by test-jar:
>>> >
>>> > 
>>> >org.apache.flink
>>> >flink-streaming-core
>>> >${flink.version}
>>> >test-jar
>>> >test
>>> > 
>>> >
>>> >
>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk 
>>> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>> >> produce an input stream of java objects and sink the results into a
>>> >> collection for verification via JUnit asserts.
>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>> >> how to achieve the latter is not evident based on my internet
>>> >> searching. I think I've found a solution in the TestStreamEnvironment
>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>> >> which is not published to maven.
>>> >>
>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>> >> let me know if I've got it wrong.
>>> >>
>>> >> Thanks,
>>> >> Nick
>>> >>
>>> >> 
>>> >>   org.apache.flink
>>> >>   flink-streaming-core
>>> >>   ${flink.version}
>>> >>   test
>>> >>   test
>>> >> 
>>> >
>>> >
>>>
>>
>>


Re: data sink stops method

2015-10-15 Thread Till Rohrmann
Could you post a minimal example of your code where the problem is
reproducible? I assume that there has to be another problem because
env.execute should actually trigger the execution.

Cheers,

Till
​

On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl  wrote:

> Hey Stephan and Pieter,
> That was the same what I thought, so I simply changed the code like this:
>
> original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>
> env.execute()
>
> But he still not execute the two commands.
> Thank you for your time.
>
> Flo
>
>
> Am 08.10.2015 um 17:41 schrieb Stephan Ewen :
>
> Yes, sinks in Flink are lazy and do not trigger execution automatically.
> We made this choice to allow multiple concurrent sinks (spitting the
> streams and writing to many outputs concurrently). That requires explicit
> execution triggers (env.execute()).
>
> The exceptions are, as mentioned, the "eager" methods "collect()",
> "count()" and "print()". They need to be eager, because the driver program
> needs for example the "count()" value before it can possibly progress...
>
> Stephan
>
>
> On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete  wrote:
>
>> Hi Florian,
>>
>> I believe that when you call *JoinPredictionAndOriginal.collect* the
>> environment will execute your program up until that point. The Csv writes
>> are after this point, so in order to execute these steps I think you would
>> have to call *.execute()* after the Csv writes to trigger the
>> execution (where  is the name of the variable pointing to your
>> ExecutionEnvironment).
>>
>> I hope this helps :-)
>>
>> - Pieter
>>
>> 2015-10-08 14:54 GMT+02:00 Florian Heyl :
>>
>>> Hi,
>>> I need some help to figure out why one method of mine in a pipeline
>>> stops the execution on the hdfs.
>>> I am working with the 10.0-SNAPSHOT and the code is the following (see
>>> below). The method stops on the hdfs by calling the collect method (
>>> JoinPredictionAndOriginal.collect) creating a data sink, which is why
>>> the program stops before the two output files at the ends can be created.
>>> What am I missing?
>>> Thank you for your time.
>>>
>>> Best wishes,
>>> Flo
>>>
>>> // method calculates the prediction error
>>> def CalcPredError(predictions: DataSet[LabeledVector], original: 
>>> DataSet[LabeledVector],
>>>  outputPath: String, outputPath2: String, outputPath3: String): 
>>> (DataSet[LabeledVector], Double) ={
>>>
>>>   var iter = 0
>>>
>>>   val transformPred = predictions
>>> .map { tuple =>
>>> iter = iter + 1
>>> LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, 
>>> BigDecimal.RoundingMode.HALF_UP).toDouble))
>>>   }
>>>
>>>   iter = 0
>>>
>>>   val tranformOrg = original
>>> .map { tuple =>
>>> iter = iter + 1
>>> LabeledVector(iter, DenseVector(tuple.label))
>>>   }
>>>
>>>
>>>   val JoinPredictionAndOriginal = 
>>> transformPred.join(tranformOrg).where(0).equalTo(0) {
>>> (l, r) => (l.vector.head._2, r.vector.head._2)
>>>   }
>>>
>>>   val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
>>>
>>>   val N = list_JoinPredictionAndOriginal.length
>>>
>>>   val residualSum = list_JoinPredictionAndOriginal.map {
>>> num => pow((num._1 - num._2), 2)
>>>   }.sum
>>>
>>>   val predictionError = sqrt(residualSum / N)
>>>
>>>   original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>>>   transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>>>
>>>   (predictions,predictionError)
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>


Re: reduce error

2015-10-20 Thread Till Rohrmann
Hi Michele, I will look into the problem. As Ufuk said, it would be really
helpful, if you could provide us with the data set. If it's problematic to
share the data via the mailing list, then you could also send me the data
privately.

Thanks a lot for your help.

Cheers,
Till

On Fri, Oct 16, 2015 at 9:59 PM, Ufuk Celebi  wrote:

> Hey Michele! Sorry for the long delay on this. Can you share the code and
> data? The easiest thing to reproduce is to actually run the code with the
> data. It will also help a lot when trying to fix the problem. If you can't
> share publicly feel free to mail me at uce at apache org.
>
> – Ufuk
>
> On Thu, Oct 8, 2015 at 9:53 AM, Michele Bertoni <
> michele1.bert...@mail.polimi.it> wrote:
>
>> Hi everybody, I am facing a very strange problem
>> Sometimes when I run my program one line of the result is totally wrong
>> and if I repeat the execution with same input it can change
>>
>>
>> The algorithm takes two dataset and execute something like a left outer
>> join where
>> if there is a match
>>  - it increments a counter on the left tuple
>>  - add some values of the right tuple to an array of the left tuple
>> in the end it outputs the left value with the final counter and list of
>> values (printer lines A and B)
>>
>> then there is a group reduce phase that merge element from different
>> groups (there can be but not always)
>>
>> finally there is a map that applies some custom functions (printer lines
>> C and D)
>>
>>
>>
>>
>> my problem is that from printer B and C the data actually change with no
>> reason
>>
>> in example in line B counter is 7 in line C it is 1.82E9, it always goes
>> to very high numbers from E9 to E18 while it is expected to be smaller than
>> 20
>> moreover the array of extradata goes to null
>>
>>
>> what i found is that
>> it never happened if the reduce is actually executed (when there are many
>> elements in a group) but only when there is one element per group (so the
>> reduce should not change data in a tuple)
>> I am running on a small dataset 60k lines and if it happens, it always
>> happens in the same line of the input
>> if I change dataset it happens on an other line but always on the same
>> for that dataset
>>
>> By now it always happened in only one line for each dataset (I am using a
>> local environment with parallelism 1)
>>
>>
>>
>>
>> I tried to changed the reduce with a reducegroup and inside the
>> reducegroup a pure scala reduce, this worked perfectly (by now)
>> I also added two groupby and reducegroup block (that take the line at put
>> it in output) before and after the groupby-reduce, in both cases i faced no
>> problem at all
>> it only seems to appear when using the group and reduce
>>
>> any idea on what could be the problem? I am getting mad on it
>>
>>
>> thanks for help
>>
>>
>>
>> this is the code I use (there are a lot of println for debugging)
>>
>> groupedRef // expID, bin, chromosome
>>   .coGroup(groupedExp).where(1,3,5).equalTo(2,1,3){
>>   (references : Iterator[(Long, Long, Int, Int, Long, String, Long, Long, 
>> Char, Array[GValue], Long)], experiments : Iterator[(Int, Int, Long, String, 
>> Long, Long, Char, Array[GValue])], out : Collector[(Long, String, Long, 
>> Long, Char, Array[GValue], Array[List[GValue]], Int, Long)]) => {
>> val refCollected : List[PartialResult] = references.map((r) => (new 
>> PartialResult(r, 0, extraData ))).toList
>> for(e <- experiments){
>>   for(r <- refCollected){
>> if( /* complex clause not important */ ) {
>>   r.count += 1
>>   r.extra = r.extra :+ e._8.foldLeft(new Array[List[GValue]](0))((z 
>> : Array[List[GValue]], v : GValue) => z :+ List(v))
>> }
>>   }
>> }
>>
>>
>>
>> refCollected.map((pr) => {
>>
>>   if(pr.binnedRegion._1.equals(7773719163500064339L) && 
>> pr.binnedRegion._7.equals(4860L) && pr.binnedRegion._8.equals(5324L)){
>> println("-A--- " + pr.toString())
>>   }
>>   val res = (pr.binnedRegion._1, pr.binnedRegion._6, pr.binnedRegion._7, 
>> pr.binnedRegion._8, pr.binnedRegion._9, pr.binnedRegion._10, 
>> pr.extra.reduce((a,b) => a.zip(b).map((p) => p._1 ++ p._2)), pr.count, 
>> pr.binnedRegion._11)
>>
>>   if(res._1.equals(7773719163500064339L) && res._3.equals(4860L) && 
>> res._4.equals(5324L)){
>> println("-B--- " + (res._1, 
>> res._2, res._3, res._4, res._5, res._6.mkString((" - ")), res._7.mkString(" 
>> - "), res._8, res._9 ))
>>   }
>>
>>   out.collect(res)
>> })
>>
>>
>>   }
>> }
>>
>> val aggregationResult : DataSet[(Long, String, Long, Long, Char, 
>> Array[GValue])] =
>> coGroupResult
>>
>> //-
>>
>> .groupBy(8)
>> //reduce phase
>> //concatenation of extra data
>> .reduce(
>>   (r1,r2) => {
>> val out = (r1._1, r1._2, r1._3, r1._4, r1._5, r1._6,
>>   r1._7
>> 

Re: kryo exception due to race condition

2015-10-06 Thread Till Rohrmann
Hi Stefano,

we'll definitely look into it once Flink Forward is over and we've finished
the current release work. Thanks for reporting the issue.

Cheers,
Till

On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli  wrote:

> Hi guys, I could manage to complete the process crossing byte arrays I
> deserialize within the group function. However, I think this workaround is
> feasible just with relatively simple processes. Any idea/plan about to fix
> the serialization problem?
>
> saluti,
> Stefano
>
> Stefano Bortoli, PhD
>
> *ENS Technical Director *___
> *OKKAM**Srl **- www.okkam.it *
>
> *Email:* bort...@okkam.it
>
> *Phone nr: +39 0461 1823913 <%2B39%200461%201823913> *
>
> *Headquarters:* Trento (Italy), Via Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
> 2015-10-02 12:05 GMT+02:00 Stefano Bortoli :
>
>> I don't know whether it is the same issue, but after switching from my
>> POJOs to BSONObject I have got a race condition issue with kryo
>> serialization.
>> I could complete the process using the byte[], but at this point I
>> actually need the POJO. I truly believe it is related to the reuse of the
>> Kryo instance, which is not thread safe.
>>
>>
>> --
>> 2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
>> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
>> to FAILED
>> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
>> at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>> at java.util.ArrayList.get(ArrayList.java:411)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>> at
>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>> at
>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>> at
>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli :
>>
>>> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>>>
>>> saluti,
>>> Stefano
>>>
>>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen :
>>>
 This looks to me like a bug where type registrations are not properly
 forwarded to all Serializers.

 Can you open a JIRA ticket for this?

 On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli 
 wrote:

> Hi guys,
>
> I hit a Kryo exception while running a process 'crossing' POJOs
> datasets. I am using the 0.10-milestone-1.
> Checking the serializer:
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>
> I have noticed that the Kryo instance is reused along serialization
> calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I
> think it may cause the problem due to possible race condition. We had 
> these
> types of issues solved with a KryoFactory implementing a pool. Perhaps it
> should just a matter of calling the
>
> what should I do? Open a ticket?
>
> Thanks a lot guys for the great job!
>
> saluti,
> Stefano
>
> -
> com.esotericsoftware.kryo.KryoException: Encountered unregistered
> class ID: 114
> 

Re: Multiple restarts of Local Cluster

2015-09-02 Thread Till Rohrmann
Oh sorry, then I got the wrong context. I somehow thought it was about test
cases because I read `MultipleProgramTestBase` etc. Sorry my bad.

On Wed, Sep 2, 2015 at 6:00 PM, Sachin Goel <sachingoel0...@gmail.com>
wrote:

> I was under the impression that the @AfterClass annotation can only be
> used in test classes.
> Even so, the idea is that a user program running in the IDE should not be
> starting up the cluster several times [my primary concern is the addition
> of the persist operator], and we certainly cannot ask the user to terminate
> the cluster after execution, while in local mode.
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Wed, Sep 2, 2015 at 9:19 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Why is it not possible to shut down the local cluster? Can’t you shut it
>> down in the @AfterClass method?
>> ​
>>
>> On Wed, Sep 2, 2015 at 4:56 PM, Sachin Goel <sachingoel0...@gmail.com>
>> wrote:
>>
>>> Yes. That will work too. However, then it isn't possible to shut down
>>> the local cluster. [Is it necessary to do so or does it shut down
>>> automatically when the program exists? I'm not entirely sure.]
>>>
>>> -- Sachin Goel
>>> Computer Science, IIT Delhi
>>> m. +91-9871457685
>>>
>>> On Wed, Sep 2, 2015 at 7:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Have a look at some other tests, like the checkpointing tests. They
>>>> start one cluster manually and keep it running. They connect against it
>>>> using the remote environment ("localhost",
>>>> miniCluster.getJobManagerRpcPort()).
>>>>
>>>> That works nicely...
>>>>
>>>> On Wed, Sep 2, 2015 at 4:23 PM, Sachin Goel <sachingoel0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all
>>>>> While using LocalEnvironment, in case the program triggers execution
>>>>> several times, the {{LocalFlinkMiniCluster}} is started as many times. 
>>>>> This
>>>>> can consume a lot of time in setting up and tearing down the cluster.
>>>>> Further, this hinders with a new functionality I'm working on based on
>>>>> persisted results.
>>>>> One potential solution could be to follow the methodology in
>>>>> `MultipleProgramsTestBase`. The user code then would have to reside in a
>>>>> fixed function name, instead of the main method. Or is that too 
>>>>> cumbersome?
>>>>>
>>>>> Regards
>>>>> Sachin
>>>>> -- Sachin Goel
>>>>> Computer Science, IIT Delhi
>>>>> m. +91-9871457685
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Multiple restarts of Local Cluster

2015-09-02 Thread Till Rohrmann
Why is it not possible to shut down the local cluster? Can’t you shut it
down in the @AfterClass method?
​

On Wed, Sep 2, 2015 at 4:56 PM, Sachin Goel 
wrote:

> Yes. That will work too. However, then it isn't possible to shut down the
> local cluster. [Is it necessary to do so or does it shut down automatically
> when the program exists? I'm not entirely sure.]
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Wed, Sep 2, 2015 at 7:59 PM, Stephan Ewen  wrote:
>
>> Have a look at some other tests, like the checkpointing tests. They start
>> one cluster manually and keep it running. They connect against it using the
>> remote environment ("localhost", miniCluster.getJobManagerRpcPort()).
>>
>> That works nicely...
>>
>> On Wed, Sep 2, 2015 at 4:23 PM, Sachin Goel 
>> wrote:
>>
>>> Hi all
>>> While using LocalEnvironment, in case the program triggers execution
>>> several times, the {{LocalFlinkMiniCluster}} is started as many times. This
>>> can consume a lot of time in setting up and tearing down the cluster.
>>> Further, this hinders with a new functionality I'm working on based on
>>> persisted results.
>>> One potential solution could be to follow the methodology in
>>> `MultipleProgramsTestBase`. The user code then would have to reside in a
>>> fixed function name, instead of the main method. Or is that too cumbersome?
>>>
>>> Regards
>>> Sachin
>>> -- Sachin Goel
>>> Computer Science, IIT Delhi
>>> m. +91-9871457685
>>>
>>
>>
>


Re: Multiple restarts of Local Cluster

2015-09-02 Thread Till Rohrmann
Maybe we can create a single PlanExecutor for the LocalEnvironment which is
used when calling execute. This of course entails that we don’t call stop
on the LocalCluster. For cases where the program exits after calling
execute, this should be fine because all resources will then be released
anyway. It might matter for the test execution where maven reuses the JVMs
and where the LocalFlinkMiniCluster won’t be garbage collected right away.
You could try it out and see what happens.

Cheers,
Till
​

On Wed, Sep 2, 2015 at 6:03 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Oh sorry, then I got the wrong context. I somehow thought it was about
> test cases because I read `MultipleProgramTestBase` etc. Sorry my bad.
>
> On Wed, Sep 2, 2015 at 6:00 PM, Sachin Goel <sachingoel0...@gmail.com>
> wrote:
>
>> I was under the impression that the @AfterClass annotation can only be
>> used in test classes.
>> Even so, the idea is that a user program running in the IDE should not be
>> starting up the cluster several times [my primary concern is the addition
>> of the persist operator], and we certainly cannot ask the user to terminate
>> the cluster after execution, while in local mode.
>>
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>>
>> On Wed, Sep 2, 2015 at 9:19 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Why is it not possible to shut down the local cluster? Can’t you shut it
>>> down in the @AfterClass method?
>>> ​
>>>
>>> On Wed, Sep 2, 2015 at 4:56 PM, Sachin Goel <sachingoel0...@gmail.com>
>>> wrote:
>>>
>>>> Yes. That will work too. However, then it isn't possible to shut down
>>>> the local cluster. [Is it necessary to do so or does it shut down
>>>> automatically when the program exists? I'm not entirely sure.]
>>>>
>>>> -- Sachin Goel
>>>> Computer Science, IIT Delhi
>>>> m. +91-9871457685
>>>>
>>>> On Wed, Sep 2, 2015 at 7:59 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Have a look at some other tests, like the checkpointing tests. They
>>>>> start one cluster manually and keep it running. They connect against it
>>>>> using the remote environment ("localhost",
>>>>> miniCluster.getJobManagerRpcPort()).
>>>>>
>>>>> That works nicely...
>>>>>
>>>>> On Wed, Sep 2, 2015 at 4:23 PM, Sachin Goel <sachingoel0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all
>>>>>> While using LocalEnvironment, in case the program triggers execution
>>>>>> several times, the {{LocalFlinkMiniCluster}} is started as many times. 
>>>>>> This
>>>>>> can consume a lot of time in setting up and tearing down the cluster.
>>>>>> Further, this hinders with a new functionality I'm working on based on
>>>>>> persisted results.
>>>>>> One potential solution could be to follow the methodology in
>>>>>> `MultipleProgramsTestBase`. The user code then would have to reside in a
>>>>>> fixed function name, instead of the main method. Or is that too 
>>>>>> cumbersome?
>>>>>>
>>>>>> Regards
>>>>>> Sachin
>>>>>> -- Sachin Goel
>>>>>> Computer Science, IIT Delhi
>>>>>> m. +91-9871457685
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Broadcasting sets in Flink Streaming

2015-08-25 Thread Till Rohrmann
Hi Tamara,

I think this is not officially supported by Flink yet. However, I think
that Gyula had once an example where he did something comparable. Maybe he
can chime in here.

Cheers,
Till

On Tue, Aug 25, 2015 at 11:15 AM, Tamara Mendt tammyme...@gmail.com wrote:

 Hello,

 I have been trying to use the function withBroadcastSet on a
 SingleOutputStreamOperator (map) the same way I would on a MapOperator for
 a DataSet. From what I see, this cannot be done. I wonder if there is some
 way to broadcast a DataSet to the tasks that are performing transformations
 on a DataStream?

 I am basically pre-calculating some things with Flink which I later need
 for the transformations on the incoming data from the stream. So I want to
 broadcast the resulting datasets from the pre-calculations.

 Any ideas on how to best approach this?

 Thanks, cheers

 Tamara.



Re: Flink HA mode

2015-09-09 Thread Till Rohrmann
The only necessary information for the JobManager and TaskManager is to
know where to find the ZooKeeper quorum to do leader election and retrieve
the leader address from. This will be configured via the config parameter
`ha.zookeeper.quorum`.

On Wed, Sep 9, 2015 at 10:15 AM, Stephan Ewen  wrote:

> TL;DR is that you are right, it is only the initial list. If a JobManager
> comes back with a new IP address, it will be available.
>
> On Wed, Sep 9, 2015 at 8:35 AM, Ufuk Celebi  wrote:
>
>>
>> > On 09 Sep 2015, at 04:48, Emmanuel  wrote:
>> >
>> > my questions is: how critical is the bootstrap ip list in masters?
>>
>> Hey Emmanuel,
>>
>> good questions. I read over the docs for this again [1] and you are right
>> that we should make this clearer.
>>
>> The “masters" file is only relevant for the start/stop cluster scripts
>> (Flink standalone mode).
>>
>> If you specify hosts in the “masters" file the start-cluster scripts will
>> use these hosts to start job managers. After that all coordination happens
>> via ZooKeeper via a leader election and retrieval service. All job managers
>> elect a single leader and task managers and clients (submitting programs)
>> retrieve this leader via ZooKeeper. If a job manager fails and becomes
>> available again, it will publish itself via this mechanism (if it becomes
>> leader at some point again). There was a recent PR [2] which introduced
>> this. You can read over the very good PR description for more info for now.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html
>>
>> [2] https://github.com/apache/flink/pull/1016
>>
>>
>> > does this get updated or does it have to be updated by some other
>> service?
>>
>> If you start a new cluster on GCE with different hosts and use Flink’s
>> standalone mode you have to set this up again. This is the same for the
>> “slaves” file.
>>
>>
>> Does this answer your question? If anything is unclear, please post here.
>> :)
>>
>> – Ufuk
>>
>>
>


Re: Issue with parallelism when CoGrouping custom nested data tpye

2015-09-16 Thread Till Rohrmann
Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible
for you to post a complete example with data (also possible to include it
in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete  wrote:

> Dear fellow Flinkers,
>
> I am implementing queries from the XMark (
> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom
> nested data type. Reading the XML data generated by the XMark generator
> into my custom nested datatype works perfectly, and the queries that I have
> implemented so far using mostly map, reduce and filter produce correct
> results.
>
> For the next query I wish to cogroup a dataset containing person data with
> a dataset containing auction data, joined by the *personid *of the person
> and the *personid *of the buyer of an auction, so that I can count the
> number of purchases of a person. I select this *personid *as key from the
> custom nested data type in the *where* and *equalTo *functions of the
> *coGroup*. The XML2DawnInputFormat is my custom input format that reads
> XML into my custom nested datatype *DawnData*. The 'inputGraph' and
> 'auctionInput' are a projection on the XML input to prevent reading
> unnecessary data.
>
> def env = ExecutionEnvironment.*getExecutionEnvironment
> *def persons : DataSet[DawnData] = env.readFile(new 
> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = 
> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = 
> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) 
> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( 
> (personsres, auctionsres, out : Collector[DawnData]) => {   // my cogroup 
> function here that outputs the name of the person and the number of auctions  
> }}).setParallelism(1)
>
> This code works fine with parallelism set to 1 as above. My issue is that
> if I raise the parallelism of the coGroup above 1 the data will get mixed
> up. Often the auctions Iterator will be empty, and sometimes there are
> non-empty auction iterators passed to the cogroup function where the
> persons iterator is empty, but this is impossible because all buyers exist
> in the persons database!
>
> If anyone has some pointers for me why this code starts producing strange
> results when parallelism is set above 1 this would be greatly appreciated
> :-)
>
> Kind regards.
>
> Pieter Hameete
>
>
>


Re: data flow example on cluster

2015-09-30 Thread Till Rohrmann
It's described here:
https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html#run-example

Cheers,
Till

On Wed, Sep 30, 2015 at 8:24 AM, Lydia Ickler 
wrote:

> Hi all,
>
> I want to run the data-flow Wordcount example on a Flink Cluster.
> The local execution with „mvn exec:exec -Dinput=kinglear.txt
> -Doutput=wordcounts.txt“ is already working.
> How is the command to execute it on the cluster?
>
> Best regards,
> Lydia
>


Re: Flink program compiled with Janino fails

2015-10-05 Thread Till Rohrmann
I’m not a Janino expert but it might be related to the fact that Janino not
fully supports generic types (see http://unkrig.de/w/Janino under
limitations). Maybe it works of you use the untyped MapFunction type.

Cheers,
Till
​

On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari 
wrote:

> Hi guys,
> I'm developing a dynamic Flink program composer, which receive a dataflow
> from a client and convert it into Flink code.
>
> I have tried to compile a test Flink program with Janino, but it fails,
> the error I receive is:
> org.codehaus.commons.compiler.CompileException: Line 66, Column 0:
> Non-abstract class "FlinkExecutor$1" must implement method "public abstract
> java.lang.Object
> org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object)
> throws java.lang.Exception"
>
> It seems Janino doesn't recognize the MapFunction.
>
> If I put this code into a java file and I execute it with Eclipse,
> everything works good.
>
> Here the code I used:
>
> package Test;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import com.Flink.Operators.Source;
>
> public class FlinkExecutor {
> public static class RainPOJO {
> private String altitude;
> private String city_name;
> private String latitude;
> private String longitude;
> private String rainfall;
> private String station_name;
> private String time;
> public String getAltitude() {
> return altitude;
> }
> public void setAltitude(String Altitude) {
> this.altitude = Altitude;
> }
> public String getCity_name() {
> return city_name;
> }
> public void setCity_name(String City_name) {
> this.city_name = City_name;
> }
> public String getLatitude() {
> return latitude;
> }
> public void setLatitude(String Latitude) {
> this.latitude = Latitude;
> }
> public String getLongitude() {
> return longitude;
> }
> public void setLongitude(String Longitude) {
> this.longitude = Longitude;
> }
> public String getRainfall() {
> return rainfall;
> }
> public void setRainfall(String Rainfall) {
> this.rainfall = Rainfall;
> }
> public String getStation_name() {
> return station_name;
> }
> public void setStation_name(String Station_name) {
> this.station_name = Station_name;
> }
> public String getTime() {
> return time;
> }
> public void setTime(String Time) {
> this.time = Time;
> }
> }
> public FlinkExecutor() {}
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(1);
> Source Source = new Source("sensor", "rain");
> String path_Source = Source.getCSVPath();
> DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
> .ignoreFirstLine()
> .pojoType(RainPOJO.class, "altitude", "city_name", "latitude",
> "longitude", "rainfall", "station_name", "time");
> long size = ds_s1.count();
> long startTime = System.currentTimeMillis();
> ds_s1.map(new MapFunction < RainPOJO, String > () {
> int count = 0;@Override
> public String map(RainPOJO obj) throws Exception {
> count += 1;
> long endTime = System.currentTimeMillis();
> double elapsed_time = endTime - startTime;
> if (count == size) {
> double d_seconds = elapsed_time / 1000;
> return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + "
> seconds";
> }
> return " " + count;
> }
> })
> .print();
> }
> }
>


Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Till Rohrmann
Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the
cluster. Therefore, the closures must only contain Serializable objects.
The serializer registration only applies to the data which is processed by
the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo
object in your closure.

Cheers,
Till
​

On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk  wrote:

> Hello,
>
> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
> for various map functions. When I try to run the flow, job submission fails
> because of an unserializable type. This is not a type of data used within
> the flow, but rather a small collection of objects captured in the closure
> context over one of my Lambdas. I've implemented and registered a Kryo
> Serializer for this type with this environment, however, it's apparently
> not used when serializing the lambdas. Seems like the same serialization
> configuration and tools of the environment should be used when preparing
> the job for submission. Am I missing something?
>
> Thanks,
> Nick
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
> at
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
> at ImportFlow.assembleImportFlow(ImportFlow.java:111)
> at ImportFlow.main(ImportFlow.java:178)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> ... 6 more
> Caused by: java.io.NotSerializableException:
> org.apache.phoenix.util.ColumnInfo
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.ArrayList.writeObject(ArrayList.java:762)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
> ... 17 more
>


Re: Including option for starting job and task managers in the foreground

2015-12-02 Thread Till Rohrmann
Hi Brian,

as far as I know this is at the moment not possible with our scripts.
However it should be relatively easy to add by simply executing the Java
command in flink-daemon.sh in the foreground. Do you want to add this?

Cheers,
Till
On Dec 1, 2015 9:40 PM, "Brian Chhun"  wrote:

> Hi All,
>
> Is it possible to include a command line flag for starting job and task
> managers in the foreground? Currently, `bin/jobmanager.sh` and
> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
> things in the background. I'd like to execute these commands inside a
> docker container, but it's expected that the process is running in the
> foreground. I think it might be useful to have it run in the foreground so
> that it can be hooked into some process supervisors. Any suggestions are
> appreciated.
>
>
> Thanks,
> Brian
>


Re: Question about flink message processing guarantee

2015-12-02 Thread Till Rohrmann
Just a small addition. Your sources have to be replayable to some extent.
With replayable I mean that they can continue from some kind of offset.
Otherwise the check pointing won't help you. The Kafka source supports that
for example.

Cheers,
Till
On Dec 1, 2015 11:55 PM, "Márton Balassi"  wrote:

> Dear Jerry,
>
> If you do not enable checkpointing you get the at most once processing
> guarantee (some might call that no guarantee at all). When you enable
> checkpointing you can choose between exactly and at least once semantics.
> The latter provides better latency.
>
> Best,
>
> Marton
>
> On Tue, Dec 1, 2015 at 11:04 PM, Jerry Peng 
> wrote:
>
>> Hello,
>>
>> I have a question regarding link streaming.  I now if you enable
>> checkpointing you can have exactly once processing guarantee. If I do
>> not enable checkpointing what are the semantics of the processing? At
>> least once?
>>
>> Best,
>>
>> Jerry
>>
>
>


Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Till Rohrmann
Hi Arnaud,

as long as you don't have HA activated for your batch jobs, HA shouldn't
have an influence on the batch execution. If it interferes, then you should
see additional task manager connected to the streaming cluster when you
execute the batch job. Could you check that? Furthermore, could you check
that actually a second yarn application is started when you run the batch
jobs?

Cheers,
Till

On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> I have both streaming applications & batch applications. Since the memory
> needs are not the same, I was using a long-living container for my
> streaming apps and new short-lived containers for hosting each batch
> execution.
>
>
>
> For that, I submit streaming jobs with "*flink run*"  and batch jobs with
> "*flink run -m yarn-cluster*"
>
>
>
> This was working fine until I turned zookeeper HA mode on for my streaming
> applications.
>
> Even if I don't set it up in the yaml flink configuration file, but with
> -D options on the yarn_session.sh command line, now my batch jobs try to
> run in the streaming container, and fails because of the lack of ressources.
>
>
>
> My HA options are :
>
> -Dyarn.application-attempts=10 -Drecovery.mode=zookeeper
> -Drecovery.zookeeper.quorum=h1r1en01:2181
> -Drecovery.zookeeper.path.root=/flink  -Dstate.backend=filesystem
> -Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints
> -Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/
>
>
>
> Am I missing something ?
>
>
>
> Best regards,
>
> Aranud
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Using memory logging in Flink

2015-12-09 Thread Till Rohrmann
I assume you're looking in the taskmanager log file for the memory usage
logging statements, right?

Cheers,
Till

On Wed, Dec 9, 2015 at 12:15 AM, Filip Łęczycki 
wrote:

> Hi,
>
> Thank you for your reply!
>
> I have made sure I restarted the TaskManager after changing config, but it
> didn't resolve the issue.The config is loaded as I can see the following
> line in the log:
> 09:12:2015 00:00:21,894 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.debug.memory.startLogThread, true
>
> I am running a job on local standalone flink instance and my
> log4j.properties look like this:
> log4j.rootLogger=DEBUG, file
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{dd:MM:YYY HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
>
> Maybe I can set some property in the job's code so that it would force
> such verbose logging?  I need those logs to run some flink performance
> tests but maybe I can somehow extract the benchmark results without them,  do
> you know any other way to monitor Flink Job's memory usage and GC time,
> other than looking at web interface?
>
> Best regards,
> Filip Łęczycki
>
> Pozdrawiam,
> Filip Łęczycki
>
> 2015-12-08 20:48 GMT+01:00 Stephan Ewen :
>
>> Hi!
>>
>> That is exactly the right way to do it. Logging has to be at least INFO
>> and the parameter "taskmanager.debug.memory.startLogThread" set to true.
>> The log output should be under
>> "org.apache.flink.runtime.taskmanager.TaskManager".
>>
>> Do you see other outputs for that class in the log?
>>
>> Make sure you restarted the TaskManager processes after you changed the
>> config file.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, Dec 8, 2015 at 6:56 PM, Filip Łęczycki 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to enable logging of memory usage on flink 0.10.0 by adding:
>>>
>>> taskmanager.debug.memory.startLogThread: true
>>>
>>> to config.yaml and setting log4j level to DEBUG however in the logs after 
>>> running the job I cannot see any info regarding memory usage.My job lasted 
>>> 30s so it should catch few intervals. Should I change something else in the 
>>> configuration?
>>>
>>>
>>> Best regards/Pozdrawiam,
>>>
>>> Filip Łęczycki
>>>
>>
>>
>


Re: Problems with using ZipWithIndex

2015-12-14 Thread Till Rohrmann
I just tested the zipWithIndex method with Flink 0.10.1 and it worked. I
used the following code:

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._

object Job {
  def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1,2,3,4,5,6,7)

val result = input.zipWithIndex

result.print()
  }
}

Cheers,
Till
​

On Sat, Dec 12, 2015 at 8:48 PM, Filip Łęczycki 
wrote:

> Hi Marton,
>
> Thank you for your answer. I wasn't able to use zipWithIndex in a way that
> you stated as i got "cannot resolve" error. However it worked when i used
> it like this:
>
> val utils = new DataSetUtils[AlignmentRecord](data)
> val index = utils.zipWithIndex
>
> Regards,
> Filip Łęczycki
>
> Pozdrawiam,
> Filip Łęczycki
>
> 2015-12-12 19:56 GMT+01:00 Márton Balassi :
>
>> Hey Filip,
>>
>> As you are using the scala API it is easier to use the Scala DataSet
>> utils, which are accessible after the following import:
>>
>> import org.apache.flink.api.scala.utils._
>>
>> Then you can do the following:
>>
>> val indexed = data.zipWithIndex
>>
>> Best,
>>
>> Marton
>>
>>
>> On Sat, Dec 12, 2015 at 7:48 PM, Filip Łęczycki 
>> wrote:
>>
>>> Hi all,
>>>
>>> I tried to use ZipWithIndex functionality, accordingly to the Scala
>>> examples posted here:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/zip_elements_guide.html
>>>
>>> however I am not able to call the mentioned function because it cannot
>>> be resolved. I checked the flink code
>>> for org.apache.flink.api.scala.DataSet and there is no such function. I am
>>> using the latest version, 0.10.1. Was it removed or moved to different
>>> module? Is there any way to use it?
>>>
>>> When i try to use the function from DataSetUtils java module:
>>>
>>> data is of type DataSet[AlignmentRecord]
>>> val indexed = DataSetUtils.zipWithIndex[AlignmentRecord](data)
>>>
>>> I receive following error:
>>> Type mismatch: expected: DataSet[AlignmentRecord], actual:
>>> DataSet[AlignmentRecord]
>>>
>>> Could you please guide me how to use this function?
>>>
>>> Pozdrawiam,
>>> Filip Łęczycki
>>>
>>
>>
>


Re: Problem to show logs in task managers

2015-12-17 Thread Till Rohrmann
Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory.
It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging
level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez  wrote:

> Hi flink community,
>
> I am trying to show log messages using log4j.
> It works fine overall except for the messages I want to show in an inner
> class that implements
> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
> I am very new to this, but it seems that I’m having problems to show the
> messages included in the isConverged function, as it runs in the task
> managers?
> E.g. the log messages in the outer class (before map-reduce operations)
> are properly shown.
>
> I am also interested in providing my own log4j.properties file. I am using
> the ./bin/flink run -m yarn-cluster on Amazon clusters.
>
> Thanks,
> Ana


Re: global watermark across multiple kafka consumers

2015-12-16 Thread Till Rohrmann
Hi Andrew,

as far as I know, there is nothing such as a prescribed way of handling
this kind of situation. If you want to synchronize the watermark generation
given a set of KafkaConsumers you need some kind of ground truth.

This could be, for example, a central registry such as ZooKeeper in which
you collect the current watermarks of the different consumers. You could
access ZooKeeper from inside the TimestampExtractor.

Alternatively, however a bit more hacky, you could exploit that the
consumer tasks are usually colocated with consumer tasks from different
topics. This means that you'll have multiple subtasks reading from the
different Kafka topics running in the same JVM. You could then use class
variables to synchronize the watermarks. But this assumes that each subtask
reading the topic t from Kafka is colocated with at least one other subtask
reading the topic t' from Kafka with t' in T \ {t} and T being the set of
Kafka topics. Per default this should be the case.

I'm wondering why you need a global watermark for you Kafka topics. Isn't
it enough that you have individual watermarks for each topic?

Cheers,
Till

On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew 
wrote:

> Hi guys,
>
> I have a question related to utilizing watermarks with multiple
> FlinkKakfkaConsumer082 instances. The aim is to have a global watermark
> across multiple kafka consumers where any message from any kafka partition
> would update the same watermark. When testing a simple TimeStampExtractor
> implementation it seems each consumer results in a separate watermark. Is
> there a prescribed way of handling such a thing that anyone has any
> experience with?
>
> Thanks for your help,
>
> Andrew Griess
>
>


Re: Scala API and sources with timestamp

2016-01-04 Thread Till Rohrmann
Hi Don,

yes that's exactly how you use an anonymous function as a source function.

Cheers,
Till

On Tue, Dec 22, 2015 at 3:16 PM, Don Frascuchon 
wrote:

> Hello,
>
> There is a way for define  a EventTimeSourceFunction with anonymous
> functions from the scala api?  Like that:
>
>  env.addSource[Int] {
>   ctx => {
>
>   ...
>
> ctx.collectWithTimestamp(i, System.currentTimeMillis())
> ...
>
>   }
>
> }
>
> Thanks in advance!
>


Re: flink kafka scala error

2016-01-06 Thread Till Rohrmann
Hi Madhukar,

could you check whether your Flink installation contains the
flink-dist-0.10.1.jar in the lib folder? This file contains the necessary
scala-library.jar which you are missing. You can also remove the line
org.scala-lang:scala-library which excludes the
scala-library dependency to be included in the fat jar of your job.

Cheers,
Till
​

On Wed, Jan 6, 2016 at 5:54 AM, Madhukar Thota 
wrote:

> Hi
>
> I am seeing the following error when i am trying to run the jar in Flink
> Cluster. I am not sure what dependency is missing.
>
>  /opt/DataHUB/flink-0.10.1/bin/flink  run datahub-heka-1.0-SNAPSHOT.jar
> flink.properties
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
> at kafka.utils.Pool.(Pool.scala:28)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> at
> kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:691)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:281)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
> at com.lmig.datahub.heka.Main.main(Main.java:39)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: java.lang.ClassNotFoundException:
> scala.collection.GenTraversableOnce$class
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 20 more
>
> The exception above occurred while trying to run your command.
>
>
> Here is my pom.xml:
>
> 
> http://maven.apache.org/POM/4.0.0;
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> com.datahub
> datahub-heka
> 1.0-SNAPSHOT
> 
> 
> org.apache.flink
> flink-java
> 0.10.1
> 
> 
> org.apache.flink
> flink-streaming-java
> 0.10.1
> 
> 
> org.apache.flink
> flink-clients
> 0.10.1
> 
> 
> org.apache.kafka
> kafka_2.10
> 0.8.2.2
> 
> 
> org.apache.flink
> flink-connector-kafka
> 0.10.1
> 
> 
> org.apache.flink
> flink-connector-elasticsearch
> 0.10.1
> 
> 
> org.elasticsearch
> elasticsearch
> 1.7.2
> 
> 
> org.elasticsearch
> elasticsearch-shield
> 1.3.3
> 
> 
> org.elasticsearch
> elasticsearch-license-plugin
> 1.0.0
> 
> 
> com.fasterxml.jackson.core
> jackson-core
> 2.6.4
> 
> 
> com.fasterxml.jackson.core
> jackson-databind
> 2.6.4
> 
> 
> 
> 
> elasticsearch-releases
> http://maven.elasticsearch.org/releases
> 
> true
> 
> 
> false
> 
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.1
> 
> 
> 
> package
> 
> shade
>   

Re: Problem to show logs in task managers

2015-12-18 Thread Till Rohrmann
In which log file are you exactly looking for the logging statements? And
on what machine? You have to look on the machines on which the yarn
container were started. Alternatively if you have log aggregation
activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:

> Hi Till,
>
> Many thanks for your quick response.
>
> I have modified the WordCountExample to re-reproduce my problem in a
> simple example.
>
> I run the code below with the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
> mypackage.WordCountExample ../flinklink.jar
>
> And if I check the log file I see all logger messages except the one in
> the flatMap function of the inner LineSplitter class, which is actually the
> one I am most interested in.
>
> Is that an expected behaviour?
>
> Thanks,
> Ana
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
>
> public class WordCountExample {
> static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>
> logger.info("Entering application.");
>
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
>
> List elements = new ArrayList();
> elements.add(0);
>
>
> DataSet set = env.fromElements(new TestClass(elements));
>
> DataSet<Tuple2<String, Integer>> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
>
> wordCounts.print();
>
>
> }
>
> public static class LineSplitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>
> static Logger loggerLineSplitter = 
> LoggerFactory.getLogger(LineSplitter.class);
>
> @Override
> public void flatMap(String line, Collector<Tuple2<String, Integer>> 
> out) {
> loggerLineSplitter.info("Logger in LineSplitter.flatMap");
> for (String word : line.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
>
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
>
> static Logger loggerTestClass = 
> LoggerFactory.getLogger("WordCountExample.TestClass");
>
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> loggerTestClass.info("Logger in TestClass");
> }
>
>
> }
> }
>
>
>
>
> On 17 Dec 2015, at 16:08, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Ana,
>
> you can simply modify the `log4j.properties` file in the `conf` directory.
> It should be automatically included in the Yarn application.
>
> Concerning your logging problem, it might be that you have set the logging
> level too high. Could you share the code with us?
>
> Cheers,
> Till
>
> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>
>> Hi flink community,
>>
>> I am trying to show log messages using log4j.
>> It works fine overall except for the messages I want to show in an inner
>> class that implements
>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>> I am very new to this, but it seems that I’m having problems to show the
>> messages included in the isConverged function, as it runs in the task
>> managers?
>> E.g. the log messages in the outer class (before map-reduce operations)
>> are properly shown.
>>
>> I am also interested in providing my own log4j.properties file. I am
>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>
>> Thanks,
>> Ana
>
>
>
>


Re: Configure log4j with XML files

2015-12-21 Thread Till Rohrmann
Hi Gwenhaël,

as far as I know, there is no direct way to do so. You can either adapt the
flink-daemon.sh script in line 68 to use a different configuration or you
can test whether the dynamic property -Dlog4j.configurationFile:CONFIG_FILE
overrides the -Dlog4j.confguration property. You can set the dynamic
property using Flink’s env.java.opts configuration parameter.

Cheers,
Till
​

On Mon, Dec 21, 2015 at 3:34 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hi everybody,
>
>
>
> Could it be possible to have a way to configure log4j with xml files ?
>
>
>
> I’ve looked into the code and it looks like the properties files names are
> hardcoded. However we have the need to use xml :
>
> -  We log everything into ELK (Elasticsearch / Logstash / Kibana)
> using SocketAppender
>
> -  Socket appender is synchronous by default and slow whole app
> if anything goes wrong with the ELK
>
> -  We usually add an AsyncAppender on top of the SocketAppender,
> but this sort of configuration is only possible using an XML config file…
>
>
>
> We’ve already ran into the issue. Everything was almost paused because the
> ELK was overloaded and extremely slow.
>
>
>
> B.R.
>
>
>
> Gwenhaël PASQUIERS
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Till Rohrmann
Hi Welly,

at the moment Flink only supports HA via ZooKeeper. However, there is no
limitation to use another system. The only requirement is that this system
allows you to find a consensus among multiple participants and to retrieve
the community decision. If this is possible, then it can be integrated into
Flink to serve as an alternative HA backend.

Cheers,
Till

On Wed, Nov 25, 2015 at 10:30 AM, Maximilian Michels  wrote:

> Hi Welly,
>
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use only YARN without Hadoop ?
>
> I wouldn't say tightly coupled. You can use YARN without HDFS. To work
> with YARN properly, you would have to setup another distributed file
> system like xtreemfs. Or use the one provided with the AWS or Google
> Cloud Platform. You can tell Hadoop which file system to use by
> modifying "fs.default.name" in the Hadoop config.
>
> Cheers,
> Max
>
> On Wed, Nov 25, 2015 at 10:06 AM, Welly Tambunan 
> wrote:
> > Hi Fabian,
> >
> > Interesting !
> >
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use
> > only YARN without Hadoop ?
> >
> > Currently we are using Cassandra and CFS ( cass file system )
> >
> >
> > Cheers
> >
> > On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske 
> wrote:
> >>
> >> A strong argument for YARN mode can be the isolation of multiple users
> and
> >> jobs. You can easily start a new Flink cluster for each job or user.
> >> However, this comes at the price of resource (memory) fragmentation.
> YARN
> >> mode does not use memory as effective as cluster mode.
> >>
> >> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi :
> >>>
> >>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> >>> >
> >>> > Hi All,
> >>> >
> >>> > I would like to know if there any feature differences between using
> >>> > Standalone Cluster vs YARN ?
> >>> >
> >>> > Until now we are using Standalone cluster for our jobs.
> >>> > Is there any added value for using YARN ?
> >>> >
> >>> > We don't have any hadoop infrastructure in place right now but we can
> >>> > provide that if there's some value to that.
> >>>
> >>> There are no features, which only work on YARN or in standalone
> clusters.
> >>> YARN mode is essentially starting a standalone cluster in YARN
> containers.
> >>>
> >>> In failure cases I find YARN more convenient, because it takes care of
> >>> restarting failed task manager processes/containers for you.
> >>>
> >>> – Ufuk
> >>>
> >>
> >
> >
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>


Re: key

2015-11-30 Thread Till Rohrmann
Hi Radu,

if you want to use custom types as keys, then these custom types have to
implement the Key interface.

Cheers,
Till
​

On Mon, Nov 30, 2015 at 5:28 PM, Radu Tudoran 
wrote:

> Hi,
>
>
>
> I want to apply a “keyBy operator on a stream”.
>
> The string is of type MyEvent. This is a simple type that contains 2 longs
> and and int or string
>
>
>
> However, when applying this I get
>
>
>
> Exception in thread "main"
> *org.apache.flink.api.common.InvalidProgramException*: This type
> (GenericType) cannot be used as key.
>
>
>
> Can you give me a hint about a solution to this?
>
>
>
> Thanks
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>


Re: Compiler Exception

2015-11-19 Thread Till Rohrmann
Hi Kien Truong,

could you share the problematic code with us?

Cheers,
Till
On Nov 18, 2015 9:54 PM, "Truong Duc Kien"  wrote:

> Hi,
>
> I'm hitting Compiler Exception with some of my data set, but not all of
> them.
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> No plan meeting the requirements could be created @ Bulk Iteration (Bulk
> Iteration) (1:null). Most likely reason: Too restrictive plan hints.
>
> Can I have some hints on how to troubleshoot this ?
>
> Thanks,
> Kien Truong
>
>


Re: YARN High Availability

2015-11-19 Thread Till Rohrmann
You mean an additional start-up parameter for the `start-cluster.sh` script
for the HA case? That could work.

On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Maybe we could add a user parameter to specify a cluster name that is used
> to make the paths unique.
>
> On Thu, Nov 19, 2015, 11:24 Till Rohrmann <trohrm...@apache.org> wrote:
>
>> I agree that this would make the configuration easier. However, it
>> entails also that the user has to retrieve the randomized path from the
>> logs if he wants to restart jobs after the cluster has crashed or
>> intentionally restarted. Furthermore, the system won't be able to clean up
>> old checkpoint and job handles in case that the cluster stop was
>> intentional.
>>
>> Thus, the question is how do we define the behaviour in order to retrieve
>> handles and to clean up old handles so that ZooKeeper won't be cluttered
>> with old handles?
>>
>> There are basically two modes:
>>
>> 1. Keep state handles when shutting down the cluster. Provide a mean to
>> define a fixed path when starting the cluster and also a mean to purge old
>> state handles. Furthermore, add a shutdown mode where the handles under the
>> current path are directly removed. This mode would guarantee to always have
>> the state handles available if not explicitly told differently. However,
>> the downside is that ZooKeeper will be cluttered most certainly.
>>
>> 2. Remove the state handles when shutting down the cluster. Provide a
>> shutdown mode where we keep the state handles. This will keep ZooKeeper
>> clean but will give you also the possibility to keep a checkpoint around if
>> necessary. However, the user is more likely to lose his state when shutting
>> down the cluster.
>>
>> On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> I agree with Aljoscha. Many companies install Flink (and its config) in
>>> a central directory and users share that installation.
>>>
>>> On Thu, Nov 19, 2015 at 10:45 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> I think we should find a way to randomize the paths where the HA stuff
>>>> stores data. If users don’t realize that they store data in the same paths
>>>> this could lead to problems.
>>>>
>>>> > On 19 Nov 2015, at 08:50, Till Rohrmann <trohrm...@apache.org> wrote:
>>>> >
>>>> > Hi Gwenhaël,
>>>> >
>>>> > good to hear that you could resolve the problem.
>>>> >
>>>> > When you run multiple HA flink jobs in the same cluster, then you
>>>> don’t have to adjust the configuration of Flink. It should work out of the
>>>> box.
>>>> >
>>>> > However, if you run multiple HA Flink cluster, then you have to set
>>>> for each cluster a distinct ZooKeeper root path via the option
>>>> recovery.zookeeper.path.root in the Flink configuraiton. This is necessary
>>>> because otherwise all JobManagers (the ones of the different clusters) will
>>>> compete for a single leadership. Furthermore, all TaskManagers will only
>>>> see the one and only leader and connect to it. The reason is that the
>>>> TaskManagers will look up their leader at a ZNode below the ZooKeeper root
>>>> path.
>>>> >
>>>> > If you have other questions then don’t hesitate asking me.
>>>> >
>>>> > Cheers,
>>>> > Till
>>>> >
>>>> >
>>>> > On Wed, Nov 18, 2015 at 6:37 PM, Gwenhael Pasquiers <
>>>> gwenhael.pasqui...@ericsson.com> wrote:
>>>> > Nevermind,
>>>> >
>>>> >
>>>> >
>>>> > Looking at the logs I saw that it was having issues trying to connect
>>>> to ZK.
>>>> >
>>>> > To make I short is had the wrong port.
>>>> >
>>>> >
>>>> >
>>>> > It is now starting.
>>>> >
>>>> >
>>>> >
>>>> > Tomorrow I’ll try to kill some JobManagers *evil*.
>>>> >
>>>> >
>>>> >
>>>> > Another question : if I have multiple HA flink jobs, are there some
>>>> points to check in order to be sure that they won’t collide on hdfs or ZK ?
>>>> >
>>>> >
>>>> >
>>>> > B.R.
&

Re: YARN High Availability

2015-11-19 Thread Till Rohrmann
I agree that this would make the configuration easier. However, it entails
also that the user has to retrieve the randomized path from the logs if he
wants to restart jobs after the cluster has crashed or intentionally
restarted. Furthermore, the system won't be able to clean up old checkpoint
and job handles in case that the cluster stop was intentional.

Thus, the question is how do we define the behaviour in order to retrieve
handles and to clean up old handles so that ZooKeeper won't be cluttered
with old handles?

There are basically two modes:

1. Keep state handles when shutting down the cluster. Provide a mean to
define a fixed path when starting the cluster and also a mean to purge old
state handles. Furthermore, add a shutdown mode where the handles under the
current path are directly removed. This mode would guarantee to always have
the state handles available if not explicitly told differently. However,
the downside is that ZooKeeper will be cluttered most certainly.

2. Remove the state handles when shutting down the cluster. Provide a
shutdown mode where we keep the state handles. This will keep ZooKeeper
clean but will give you also the possibility to keep a checkpoint around if
necessary. However, the user is more likely to lose his state when shutting
down the cluster.

On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> I agree with Aljoscha. Many companies install Flink (and its config) in a
> central directory and users share that installation.
>
> On Thu, Nov 19, 2015 at 10:45 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> I think we should find a way to randomize the paths where the HA stuff
>> stores data. If users don’t realize that they store data in the same paths
>> this could lead to problems.
>>
>> > On 19 Nov 2015, at 08:50, Till Rohrmann <trohrm...@apache.org> wrote:
>> >
>> > Hi Gwenhaël,
>> >
>> > good to hear that you could resolve the problem.
>> >
>> > When you run multiple HA flink jobs in the same cluster, then you don’t
>> have to adjust the configuration of Flink. It should work out of the box.
>> >
>> > However, if you run multiple HA Flink cluster, then you have to set for
>> each cluster a distinct ZooKeeper root path via the option
>> recovery.zookeeper.path.root in the Flink configuraiton. This is necessary
>> because otherwise all JobManagers (the ones of the different clusters) will
>> compete for a single leadership. Furthermore, all TaskManagers will only
>> see the one and only leader and connect to it. The reason is that the
>> TaskManagers will look up their leader at a ZNode below the ZooKeeper root
>> path.
>> >
>> > If you have other questions then don’t hesitate asking me.
>> >
>> > Cheers,
>> > Till
>> >
>> >
>> > On Wed, Nov 18, 2015 at 6:37 PM, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>> > Nevermind,
>> >
>> >
>> >
>> > Looking at the logs I saw that it was having issues trying to connect
>> to ZK.
>> >
>> > To make I short is had the wrong port.
>> >
>> >
>> >
>> > It is now starting.
>> >
>> >
>> >
>> > Tomorrow I’ll try to kill some JobManagers *evil*.
>> >
>> >
>> >
>> > Another question : if I have multiple HA flink jobs, are there some
>> points to check in order to be sure that they won’t collide on hdfs or ZK ?
>> >
>> >
>> >
>> > B.R.
>> >
>> >
>> >
>> > Gwenhaël PASQUIERS
>> >
>> >
>> >
>> > From: Till Rohrmann [mailto:till.rohrm...@gmail.com]
>> > Sent: mercredi 18 novembre 2015 18:01
>> > To: user@flink.apache.org
>> > Subject: Re: YARN High Availability
>> >
>> >
>> >
>> > Hi Gwenhaël,
>> >
>> >
>> >
>> > do you have access to the yarn logs?
>> >
>> >
>> >
>> > Cheers,
>> >
>> > Till
>> >
>> >
>> >
>> > On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>> >
>> > Hello,
>> >
>> >
>> >
>> > We’re trying to set up high availability using an existing zookeeper
>> quorum already running in our Cloudera cluster.
>> >
>> >
>> >
>> > So, as per the doc we’ve changed the max attempt in yarn’s config as
>> well as the flink.yaml.
>> >
>> >
>> >
>> > recovery.mode: zookeeper
>> >
>> > recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
>> >
>> > state.backend: filesystem
>> >
>> > state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
>> >
>> > recovery.zookeeper.storageDir: hdfs:///flink/recovery/
>> >
>> > yarn.application-attempts: 1000
>> >
>> >
>> >
>> > Everything is ok as long as recovery.mode is commented.
>> >
>> > As soon as I uncomment recovery.mode the deployment on yarn is stuck on
>> :
>> >
>> >
>> >
>> > “Deploying cluster, current state ACCEPTED”.
>> >
>> > “Deployment took more than 60 seconds….”
>> >
>> > Every second.
>> >
>> >
>> >
>> > And I have more than enough resources available on my yarn cluster.
>> >
>> >
>> >
>> > Do you have any idea of what could cause this, and/or what logs I
>> should look for in order to understand ?
>> >
>> >
>> >
>> > B.R.
>> >
>> >
>> >
>> > Gwenhaël PASQUIERS
>> >
>> >
>> >
>> >
>>
>>
>


Re: YARN High Availability

2015-11-23 Thread Till Rohrmann
The problem is the execution graph handle which is stored in ZooKeeper. You
can manually remove it via the ZooKeeper shell by simply deleting
everything below your `recovery.zookeeper.path.root` ZNode. But you should
be sure that the cluster has been stopped before.

Do you start the different clusters with different
`recovery.zookeeper.path.root` values? If not, then you should run into
troubles when running multiple clusters at the same time. The reason is
that then all clusters will think that they belong together.

Cheers,
Till

On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> OK, I understand.
>
> Maybe we are not really using flink as you intended. The way we are using
> it, one cluster equals one job. That way we are sure to isolate the
> different jobs as much as possible and in case of crashes / bugs / (etc)
> can completely kill one cluster without interfering with the other jobs.
>
> That future behavior seems good :-)
>
> Instead of the manual flink commands, is there to manually delete those
> old jobs before launching my job ? They probably are somewhere in hdfs,
> aren't they ?
>
> B.R.
>
>
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: lundi 23 novembre 2015 12:12
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
>
> Hey Gwenhaël,
>
> the restarting jobs are most likely old job submissions. They are not
> cleaned up when you shut down the cluster, but only when they finish
> (either regular finish or after cancelling).
>
> The workaround is to use the command line frontend:
>
> bin/flink cancel JOBID
>
> for each RESTARTING job. Sorry about the inconvenience!
>
> We are in an active discussion about addressing this. The future behaviour
> will be that the startup or shutdown of a cluster cleans up everything and
> an option to skip this step.
>
> The reasoning for the initial solution (not removing anything) was to make
> sure that no jobs are deleted by accident. But it looks like this is more
> confusing than helpful.
>
> – Ufuk
>
> > On 23 Nov 2015, at 11:45, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
> >
> > Hi again !
> >
> > On the same topic I'm still trying to start my streaming job with HA.
> > The HA part seems to be more or less OK (I killed the JobManager and it
> came back), however I have an issue with the TaskManagers.
> > I configured my job to have only one TaskManager and 1 slot that does
> [source=>map=>sink].
> > The issue I'm encountering is that other instances of my job appear and
> are in the RESTARTING status since there is only one task slot.
> >
> > Do you know of this, or have an idea of where to look in order to
> understand what's happening ?
> >
> > B.R.
> >
> > Gwenhaël PASQUIERS
> >
> > -Original Message-
> > From: Maximilian Michels [mailto:m...@apache.org]
> > Sent: jeudi 19 novembre 2015 13:36
> > To: user@flink.apache.org
> > Subject: Re: YARN High Availability
> >
> > The docs have been updated.
> >
> > On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi <u...@apache.org> wrote:
> >> I’ve added a note about this to the docs and asked Max to trigger a new
> build of them.
> >>
> >> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for
> configuring the root path.
> >>
> >> In any case, it is orthogonal to Till’s proposals. That one we need to
> address as well (see FLINK-2929). The motivation for the current behaviour
> was to be rather defensive when removing state in order to not loose data
> accidentally. But it can be confusing, indeed.
> >>
> >> – Ufuk
> >>
> >>> On 19 Nov 2015, at 12:08, Till Rohrmann <trohrm...@apache.org> wrote:
> >>>
> >>> You mean an additional start-up parameter for the `start-cluster.sh`
> script for the HA case? That could work.
> >>>
> >>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
> >>> Maybe we could add a user parameter to specify a cluster name that is
> used to make the paths unique.
> >>>
> >>>
> >>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann <trohrm...@apache.org>
> wrote:
> >>> I agree that this would make the configuration easier. However, it
> entails also that the user has to retrieve the randomized path from the
> logs if he wants to restart jobs after the cluster has crashed or
> intentionally restarted. Furthermore, the system won't be able to clean up
> old checkpoint and job ha

Re: Adding TaskManager on Cluster

2015-11-24 Thread Till Rohrmann
Hi Welly,

you can always start a new TaskManager by simply calling taskmanager.sh
start [streaming|batch], depending whether you are running a streaming
cluster or a batch cluster. You can find the script in /bin.

Cheers,
Till
​

On Tue, Nov 24, 2015 at 10:27 AM, Welly Tambunan  wrote:

> What i'm looking for here is the ability to add a node to the cluster
> (scale out) when there's no task slot left for use.
>
>
> On Tue, Nov 24, 2015 at 4:24 PM, Welly Tambunan  wrote:
>
>> Hi All,
>>
>> Currently we are running flink using standalone mode.
>>
>> Is there any way to add one node ( task manager ) to the cluster without
>> bringing the cluster down ?
>>
>>
>> Cheers
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: LDBC Graph Data into Flink

2015-11-24 Thread Till Rohrmann
Nice blog post Martin!

On Tue, Nov 24, 2015 at 3:14 PM, Vasiliki Kalavri  wrote:

> Great, thanks for sharing Martin!
>
> On 24 November 2015 at 15:00, Martin Junghanns 
> wrote:
>
>> Hi,
>>
>> I wrote a short blog post about the ldbc-flink tool including a short
>> overview of Flink and a Gelly example.
>>
>> http://ldbcouncil.org/blog/ldbc-and-apache-flink
>>
>> Best,
>> Martin
>>
>> On 06.10.2015 11:00, Martin Junghanns wrote:
>> > Hi Vasia,
>> >
>> > No problem. Sure, Gelly is just a map() call away :)
>> >
>> > Best,
>> > Martin
>> >
>> > On 06.10.2015 10:53, Vasiliki Kalavri wrote:
>> >> Hi Martin,
>> >>
>> >> thanks a lot for sharing! This is a very useful tool.
>> >> I only had a quick look, but if we merge label and payload inside a
>> Tuple2,
>> >> then it should also be Gelly-compatible :)
>> >>
>> >> Cheers,
>> >> Vasia.
>> >>
>> >> On 6 October 2015 at 10:03, Martin Junghanns 
>> >> wrote:
>> >>
>> >>> Hi all,
>> >>>
>> >>> For our benchmarks with Flink, we are using a data generator provided
>> by
>> >>> the LDBC project (Linked Data Benchmark Council) [1][2]. The
>> generator uses
>> >>> MapReduce to create directed, labeled, attributed graphs that mimic
>> >>> properties of real online social networks (e.g, degree distribution,
>> >>> diameter). The output is stored in several files either local or in
>> HDFS.
>> >>> Each file represents a vertex, edge or multi-valued property class.
>> >>>
>> >>> I wrote a little tool, that parses and transforms the LDBC output
>> into two
>> >>> datasets representing vertices and edges. Each vertex has a unique
>> id, a
>> >>> label and payload according to the LDBC schema. Each edge has a
>> unique id,
>> >>> a label, source and target vertex IDs and also payload according to
>> the
>> >>> schema.
>> >>>
>> >>> I thought this may be useful for others so I put it on GitHub [2]. It
>> >>> currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in
>> >>> there.
>> >>>
>> >>> Best,
>> >>> Martin
>> >>>
>> >>> [1] http://ldbcouncil.org/
>> >>> [2] https://github.com/ldbc/ldbc_snb_datagen
>> >>> [3] https://github.com/s1ck/ldbc-flink-import
>> >>>
>> >>
>>
>
>


Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Till Rohrmann
The constructor of Java classes after deserialization is not necessarily
called. Thus, you should move the check

if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}

into the deserialize method of MyAvroDeserializer.

Cheers,
Till
​

On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota 
wrote:

> Hi Max/Ewen,
>
> Thank you for the inputs. I was able to solve the serialization issues.
> Now i am seeing the NullPoint Exceptions.
>
> public class MyAvroDeserializer implements DeserializationSchema {
>
> private transient KafkaAvroDecoder decoder;
>
> public MyAvroDeserializer(VerifiableProperties vProps) {
> if (this.decoder == null) {
> this.decoder = new KafkaAvroDecoder(vProps);
> }
> }
>
> @Override
> public String deserialize(byte[] message) throws IOException {
> return (String) decoder.fromBytes(message);
> }
>
> @Override
> public boolean isEndOfStream(String nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(String.class);
> }
>
> }
>
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(1/4) switched to 
> RUNNING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(1/4) switched to 
> FAILED
> java.lang.Exception
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
>   at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
>
> 11/19/2015 07:47:39   Job execution switched to status FAILING.
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(2/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(3/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(4/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(2/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(4/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(3/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Job execution switched to status FAILED.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>   at 

Re: Problem to show logs in task managers

2016-01-11 Thread Till Rohrmann
Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s
web interface whether there are still occupied containers. Alternatively
you can go to the different machines and run jps which lists you the
running Java processes. If you see an ApplicationMaster or
YarnTaskManagerRunner process, then there is still a container running with
Flink on this machine. I hope this helps you.

Cheers,
Till
​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <a...@cs.aau.dk> wrote:

> Hi Till,
>
> Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if
> I retrieve the task manager logs manually
> (under /var/log/hadoop-yarn/containers/application_X/…). However that
> solution is not ideal when for instance I am using 32 machines for my
> mapReduce operations.
>
> I would like to know why Yarn’s log aggregation is not working. Can you
> tell me how to check if there are some Yarn containers running after the
> Flink job has finished? I have tried:
> hadoop job -list
> but I cannot see any jobs there, although I am not sure that it means that
> there are not containers running...
>
> Thanks,
> Ana
>
> On 08 Jan 2016, at 16:24, Till Rohrmann <trohrm...@apache.org> wrote:
>
> You’re right that the log statements of the LineSplitter are in the logs
> of the cluster nodes, because that’s where the LineSplitter code is
> executed. In contrast, you create a TestClass on the client when you
> submit the program. Therefore, you see the logging statement “Logger in
> TestClass” on the command line or in the cli log file.
>
> So I would assume that the problem is Yarn’s log aggregation. Either your
> configuration is not correct or there are still some Yarn containers
> running after the Flink job has finished. Yarn will only show you the logs
> after all containers are terminated. Maybe you could check that.
> Alternatively, you can try to retrieve the taskmanager logs manually by
> going to the machine where your yarn container was executed. Then under
> hadoop/logs/userlogs you should find somewhere the logs.
>
> Cheers,
> Till
> ​
>
> On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>
>> Thanks for the tip Robert! It was a good idea to rule out other possible
>> causes, but I am afraid that is not the problem. If we stick to the
>> WordCountExample (for simplicity), the Exception is thrown if placed into
>> the flatMap function.
>>
>> I am going to try to re-write my problem and all the settings below:
>>
>> When I try to aggregate all logs:
>>  $yarn logs -applicationId application_1452250761414_0005
>>
>> the following message is retrieved:
>> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at
>> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does
>> not exist.
>> Log aggregation has not completed or is not enabled.
>>
>> (Tried the same command a few minutes later and got the same message, so
>> might it be that log aggregation is not properly enabled??)
>>
>> I am going to carefully enumerate all the steps I have followed (and
>> settings) to see if someone can identify why the Logger messages from CORE
>> nodes (in an Amazon cluster) are not shown.
>>
>> 1) Enable yarn.log-aggregation-enable property to true
>> in /etc/alternatives/hadoop-conf/yarn-site.xml.
>>
>> 2) Include log messages in my WordCountExample as follows:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.core.fs.FileSystem;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.List;
>>
>>
>> public class WordCountExample {
>> static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>> logger.info("Entering application.");
>>
>> DataSet text = env.fromElements(
>> "Who's there?",
>> "I think I hear them. Stand, ho! Who's there?");
>>
>> List elements = new ArrayList();
>> elements.add(0);
>>

Re: Machine Learning on Apache Fink

2016-01-11 Thread Till Rohrmann
Hi Ashutosh,

Flink’s ML library flinkML is maybe not as extensive as Spark’s MLlib.
However, Flink has native support for iterations which makes them blazingly
fast. Iterations in Flink are a distinct operator so that they don’t have
to communicate after each iteration with the client. Furthermore, Flink has
support for delta iterations which allow you to iterate over a subset of
the elements in DataSet. This can speed appropriate algorithms, such as
many graph algorithms, considerably up.

If you have further questions while studying the available material feel
free to write me.

Cheers,
Till
​

On Sat, Jan 9, 2016 at 2:20 PM, Vasiliki Kalavri 
wrote:

> Hi Ashutosh,
>
> Flink has a Machine Learning library, Flink-ML. You can find more
> information and examples the documentation [1].
> The code is currently in the flink-staging repository. There is also
> material on Slideshare that you might find interesting [2, 3, 4].
>
> I hope this helps!
> -Vasia.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/
> [2]:
> http://www.slideshare.net/tillrohrmann/machine-learning-with-apache-flink
> [3]:
> http://www.slideshare.net/TheodorosVasiloudis/flinkml-large-scale-machine-learning-with-apache-flink
> [4]:
> http://www.slideshare.net/tillrohrmann/computing-recommendations-at-extreme-scale-with-apache-flink-buzzwords-2015-48879155
>
> On 9 January 2016 at 12:46, Ashutosh Kumar 
> wrote:
>
>> I see lot of study materials and even book available for ml on spark.
>> Spark seems to be more mature for analytics related work as of now. Please
>> correct me if I am wrong. As I have already built my collection and data
>> pre processing layers on flink , I want to use Flink for analytics as well.
>> Thanks in advance.
>>
>>
>> Ashutosh
>>
>> On Sat, Jan 9, 2016 at 3:32 PM, Ashutosh Kumar <
>> ashutosh.disc...@gmail.com> wrote:
>>
>>> I am looking for some study material and examples on machine learning .
>>> Are classification , recommendation and clustering libraries available ?
>>> What is the timeline for Flink as backend for Mahout? I am building a meta
>>> data driven framework over Flink . While building data collection and
>>> transformation modules was cool , I am struggling since I started analytics
>>> module. Thanks in advance.
>>> Ashutosh
>>>
>>
>>
>


Re: eigenvalue solver

2016-01-12 Thread Till Rohrmann
Hi Lydia,

there is no Eigenvalue solver included in FlinkML yet. But if you want to,
then you can give it a try :-)

[1] http://www.cs.newpaltz.edu/~lik/publications/Ruixuan-Li-CCPE-2015.pdf

Cheers,
Till

On Tue, Jan 12, 2016 at 9:47 AM, Lydia Ickler 
wrote:

> Hi,
>
> I wanted to know if there are any implementations yet within the Machine
> Learning Library or generally that can efficiently solve eigenvalue
> problems in Flink?
> Or if not do you have suggestions on how to approach a parallel execution
> maybe with BLAS or Breeze?
>
> Thanks in advance!
> Lydia


Re: Exception using flink-connector-elasticsearch

2016-01-12 Thread Till Rohrmann
Hi Javier,

it seems as if you either are missing the lucene-codecs jar in your
classpath or that you have a wrong version (not 4.10.4). Could you check in
your job jar whether it includes lucence-codecs? If so, could you run mvn
dependency:tree in the root directory of your project. There you should see
which version of lucene-codecs you have included and from where it stems.

Cheers,
Till
​

On Tue, Jan 12, 2016 at 11:55 AM, Lopez, Javier 
wrote:

> Hi,
>
> We are using the sink for ElasticSearch and when we try to run our job we
> get the following exception:
>
> java.lang.ExceptionInInitializerError Caused by:
> java.lang.IllegalArgumentException: An SPI class of type
> org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You
> need to add the corresponding JAR file supporting this SPI to your
> classpath.  The current classpath supports the following names: []
>
> We are using embedded nodes and we don't know if we are missing some
> configuration for the elasticsearch client. This is the code we are using:
>
> Map config = Maps.newHashMap();
>
>   config.put("bulk.flush.max.actions", "1");
>
>   config.put("cluster.name", "flink-test");
>
>
>
>   result.addSink(new ElasticsearchSink<>(config, new
> IndexRequestBuilder>() {
>   @Override
>   public org.elasticsearch.action.index.IndexRequest
> createIndexRequest(Tuple4 element,
> RuntimeContext ctx) {
>   Map json = new HashMap<>();
>   json.put("data", element);
>   return org.elasticsearch.client.Requests.indexRequest()
>   .index("stream_test_1")
>   .type("aggregation_test")
>   .source(json);
>   }
>   }));
>
> The flink server as well as the elasticsearch server are in the same local
> machine.
>
> Thanks for your help
>


Re: eigenvalue solver

2016-01-12 Thread Till Rohrmann
I'm sorry, I haven't. But there should be plenty of implementations out
there.

Cheers,
Till

On Tue, Jan 12, 2016 at 11:07 AM, Lydia Ickler <ickle...@googlemail.com>
wrote:

> Hi Till,
> Thanks for the Paper Link!
> Do you have maybe a Code snippet in mind from BLAS, breeze or spark where
> to Start from?
> Best regards,
> Lydia
>
>
> Von meinem iPhone gesendet
> Am 12.01.2016 um 10:46 schrieb Till Rohrmann <trohrm...@apache.org>:
>
> Hi Lydia,
>
> there is no Eigenvalue solver included in FlinkML yet. But if you want to,
> then you can give it a try :-)
>
> [1] http://www.cs.newpaltz.edu/~lik/publications/Ruixuan-Li-CCPE-2015.pdf
>
> Cheers,
> Till
>
> On Tue, Jan 12, 2016 at 9:47 AM, Lydia Ickler <ickle...@googlemail.com>
> wrote:
>
>> Hi,
>>
>> I wanted to know if there are any implementations yet within the Machine
>> Learning Library or generally that can efficiently solve eigenvalue
>> problems in Flink?
>> Or if not do you have suggestions on how to approach a parallel execution
>> maybe with BLAS or Breeze?
>>
>> Thanks in advance!
>> Lydia
>
>
>


Re: ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Till Rohrmann
la:61)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
> at
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> ... 3 more
>
> On Wed, Jun 8, 2016 at 3:19 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi Till,
>>
>> Thanks for the reply! I see - yes it does sound very much like FLINK-1390.
>>
>> Please see my AvroDeserializationSchema implementation here:
>> http://pastebin.com/mK7SfBQ8
>>
>> I think perhaps the problem is caused by this line:
>> val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)
>>
>> Looking at SpecificData, it contains a classCache which is a map of
>> strings to classes, similar to what's described in FLINK-1390.
>>
>> I'm not sure how to change my AvroDeserializationSchema to prevent this
>> from happening though! Do you have any ideas?
>>
>> Josh
>>
>>
>>
>> On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Josh,
>>>
>>> the error message you've posted usually indicates that there is a class
>>> loader issue. When you first run your program the class
>>> com.me.avro.MyAvroType will be first loaded (by the user code class
>>> loader). I suspect that this class is now somewhere cached (e.g. the avro
>>> serializer) and when you run your program a second time, then there is a
>>> new user code class loader which has loaded the same class and now you want
>>> to convert an instance of the first class into the second class. However,
>>> these two classes are not identical since they were loaded by different
>>> class loaders.
>>>
>>> In order to find the culprit, it would be helpful to see the full stack
>>> trace of the ClassCastException and the code of the
>>> AvroDeserializationSchema. I suspect that something similar to
>>> https://issues.apache.org/jira/browse/FLINK-1390 is happening.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Currently I have to relaunch my Flink cluster every time I want to
>>>> upgrade/redeploy my Flink job, because otherwise I get a 
>>>> ClassCastException:
>>>>
>>>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
>>>> com.me.avro.MyAvroType
>>>>
>>>> It's related to MyAvroType which is an class generated from an Avro
>>>> schema. The ClassCastException occurs every time I redeploy the job without
>>>> killing the Flink cluster (even if there have been no changes to the
>>>> job/jar).
>>>>
>>>> I wrote my own AvroDeserializationSchema in Scala which does something
>>>> a little strange to get the avro type information (see below), and I'm
>>>> wondering if that's causing the problem when the Flink job creates an
>>>> AvroDeserializationSchema[MyAvroType].
>>>>
>>>> Does anyone have any ideas?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>>
>>>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag]
>>>> extends DeserializationSchema[T] {
>>>>
>>>>   ...
>>>>
>>>>   private val avroType =
>>>> classTag[T].runtimeClass.asInstanceOf[Class[T]]
>>>>
>>>>   private val typeInformation = TypeExtractor.getForClass(avroType)
>>>>
>>>>   ...
>>>>
>>>>   override def getProducedType: TypeInformation[T] = typeInformation
>>>>
>>>>   ...
>>>>
>>>> }
>>>>
>>>
>>>
>>
>


Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

2016-06-09 Thread Till Rohrmann
Hi Ahmed,

I tried setting up your use case and for me it all seems to work. However,
I didn't use the Spring framework and executed the program in a local Flink
cluster.

Maybe you can compile a self-containing example (including example data) to
reproduce your problem and send it to us.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader  wrote:

> Hello Flavio,
> Thank you so much for replying, however I didn't download Flink locally, I
> only added dependencies in a maven project. So i don't think I'll be able
> to modify the KryoSerializer class. But yeah me too i think it's the
> problem.
> Thanks,
> Ahmed
>
> On 8 June 2016 at 16:07, Flavio Pompermaier  wrote:
>
>> Hi Ahmed,
>> I also have the same error that is probably caused by the KryoSerializer.
>> Right now I'm testing a patch to this problem so maybe you could also
>> test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you
>> can use my KryoSerializer but I think so. Actually I just recreate Input
>> and Output every time in the serialized/deserialize and then I close them.
>>
>> This is my attempt to fix the problem (actually the KryoSerializer class
>> in the flink-core module):
>>
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  * http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.flink.api.java.typeutils.runtime.kryo;
>>
>> import com.esotericsoftware.kryo.Kryo;
>> import com.esotericsoftware.kryo.KryoException;
>> import com.esotericsoftware.kryo.Serializer;
>> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
>> import com.esotericsoftware.kryo.io.Input;
>> import com.esotericsoftware.kryo.io.Output;
>> import com.esotericsoftware.kryo.serializers.JavaSerializer;
>> import com.google.common.base.Preconditions;
>>
>> import org.apache.avro.generic.GenericData;
>> import org.apache.flink.api.common.ExecutionConfig;
>> import org.apache.flink.api.common.typeutils.TypeSerializer;
>> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
>> import
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
>> import org.apache.flink.core.memory.DataInputView;
>> import org.apache.flink.core.memory.DataOutputView;
>> import org.objenesis.strategy.StdInstantiatorStrategy;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.ByteArrayInputStream;
>> import java.io.ByteArrayOutputStream;
>> import java.io.EOFException;
>> import java.io.IOException;
>> import java.lang.reflect.InvocationTargetException;
>> import java.lang.reflect.Method;
>> import java.lang.reflect.Modifier;
>> import java.util.LinkedHashMap;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.Objects;
>>
>> /**
>>  * A type serializer that serializes its type using the Kryo serialization
>>  * framework (https://github.com/EsotericSoftware/kryo).
>>  *
>>  * This serializer is intended as a fallback serializer for the cases
>> that are
>>  * not covered by the basic types, tuples, and POJOs.
>>  *
>>  * @param  The type to be serialized.
>>  */
>> public class KryoSerializer extends TypeSerializer {
>>
>> private static final long serialVersionUID = 3L;
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(KryoSerializer.class);
>>
>> //
>> 
>>
>> private final LinkedHashMap> ExecutionConfig.SerializableSerializer> registeredTypesWithSerializers;
>> private final LinkedHashMap>
>> registeredTypesWithSerializerClasses;
>> private final LinkedHashMap> ExecutionConfig.SerializableSerializer> defaultSerializers;
>> private final LinkedHashMap>
>> defaultSerializerClasses;
>> private final LinkedHashSet registeredTypes;
>>
>> private final Class type;
>> //
>> 
>> // The fields below are lazily initialized after duplication or
>> 

Re: ClassCastException when redeploying Flink job on running cluster

2016-06-09 Thread Till Rohrmann
Great to hear :-)

On Wed, Jun 8, 2016 at 7:45 PM, Josh <jof...@gmail.com> wrote:

> Thanks Till, your suggestion worked!
>
> I actually just created a new SpecificData for each
> AvroDeserializationSchema instance, so I think it's still just as efficient.
>
> Josh
>
> On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> The only thing I could think of is to not use the SpecificData singleton
>> but instead creating a new SpecificData object for each SpecificDatumReader
>> (you can pass it as a third argument to the constructor). This, of course,
>> is not really efficient. But you could try it out to see whether it solves
>> your problem.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 4:24 PM, Josh <jof...@gmail.com> wrote:
>>
>>> Sorry - I forgot to include my stack trace too. Here it is:
>>>
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
>>> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
>>> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.tasks

Re: Uploaded jar disappears when web monitor restarts

2016-06-08 Thread Till Rohrmann
Hi Emanuele,

you're right, the upload directory for the jars is deleted whenever the
WebRuntimeMonitor is stopped and a new directory is created when the
WebRuntimeMonitor is started.

 It would be doable to introduce a configuration parameter for the uploaded
jar directory so that you don't have to upload the jars every time again.
But then the user would have to responsible for cleaning it up. If you
want, then you can open a JIRA issue for this feature.

I would recommend using the CLI for starting Flink jobs. The reason is that
the WebRuntimeMonitor does not support jobs which contain eager execution
trigger calls such as collect, count and print, for example.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:06 AM, Emanuele Cesena 
wrote:

> Hi,
>
> When the web monitor restarts the uploaded jars disappear — in fact, every
> time it restarts the upload directory is different.
>
> This seems intentional:
>
> https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L162
>
> Could anyone confirm?
>
> Wouldn’t it be useful to have a config param to be able to set it
> permanently (and thus avoiding jars to be deleted)? Or, what is the
> intended way to “add jars to my cluster”?
>
> Thank you,
> E.
>
>


Re: Dedicated ExecutionContext inside Flink

2016-06-08 Thread Till Rohrmann
Hi Soumya,

except for the LocalFlinkMiniCluster, which is used by the LocalEnvironment
to execute Flink programs in your IDE, we don't use the global
ExecutionContext. Thus, it should be fine to use the global
ExecutionContext for your futures without affecting Flink.

However, I would recommend to create a dedicated ExecutionContext to you.
That way, you've better control over the resource consumption of the
different parts of your code. You could, for example, create an
ExecutionContext for a subtask of an operator in the open method of the
RichFunction.

Cheers,
Till

On Wed, Jun 8, 2016 at 7:14 AM, Soumya Simanta 
wrote:

> What is the recommended practice for using a dedicated ExecutionContexts
> inside Flink code?
>
> We are making some external network calls using Futures. Currently all of
> them are running on the global execution context (import
> scala.concurrent.ExecutionContext.Implicits.global).
>
> Thanks
> -Soumya
>
>
>
>
>


Re: ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Till Rohrmann
Hi Josh,

the error message you've posted usually indicates that there is a class
loader issue. When you first run your program the class
com.me.avro.MyAvroType will be first loaded (by the user code class
loader). I suspect that this class is now somewhere cached (e.g. the avro
serializer) and when you run your program a second time, then there is a
new user code class loader which has loaded the same class and now you want
to convert an instance of the first class into the second class. However,
these two classes are not identical since they were loaded by different
class loaders.

In order to find the culprit, it would be helpful to see the full stack
trace of the ClassCastException and the code of the
AvroDeserializationSchema. I suspect that something similar to
https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh  wrote:

> Hi all,
>
> Currently I have to relaunch my Flink cluster every time I want to
> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:
>
> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
> com.me.avro.MyAvroType
>
> It's related to MyAvroType which is an class generated from an Avro
> schema. The ClassCastException occurs every time I redeploy the job without
> killing the Flink cluster (even if there have been no changes to the
> job/jar).
>
> I wrote my own AvroDeserializationSchema in Scala which does something a
> little strange to get the avro type information (see below), and I'm
> wondering if that's causing the problem when the Flink job creates an
> AvroDeserializationSchema[MyAvroType].
>
> Does anyone have any ideas?
>
> Thanks,
> Josh
>
>
>
> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends
> DeserializationSchema[T] {
>
>   ...
>
>   private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]
>
>   private val typeInformation = TypeExtractor.getForClass(avroType)
>
>   ...
>
>   override def getProducedType: TypeInformation[T] = typeInformation
>
>   ...
>
> }
>


Re: Flink Dashboard

2016-06-08 Thread Till Rohrmann
Hi Leon,

yes, you're right. The plan visualization shows the actual tasks. Each task
can contain one or more (if chained) operators. A task is split into
sub-tasks which are executed on the TaskManagers. A TaskManager slot can
accommodate one subtask of each task (if the task has not been assigned to
a different slot sharing group). Thus (per default) the number of required
slots is defined by the operator with the highest parallelism.

If you click on the different tasks, then you see in the list view at the
bottom of the page, where the individual sub-tasks have been deployed to.

The keyBy API call is actually not realized as a distinct Flink operator at
runtime. Instead, the keyBy transformation influences how a downstream
operator is connected with the upstream operator (pointwise or all to all).
Furthermore, the keyBy transformation sets a special StreamPartitioner
(HashPartitioner) which is used by the StreamRecordWriters to select the
output channels (receiving sub tasks) for the current stream record. That
is the reason why you don't see the keyBy transformation in the plan
visualization. Consequently, the illustration on our website is not totally
consistent with the actual implementation.

Cheers,
Till

On Wed, Jun 8, 2016 at 11:01 AM,  wrote:

> I am using the dashboard to inspect my multi stage pipeline. I cannot seem
> to find a manual or other description for the dashboard aside from the
> quickstart section (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/quickstart/setup_quickstart.html
> ).
>
> I would like to know how approximately my physical layout (in terms of
> task slots, tasks and task chaining) looks like.  I am assuming that each
> block in the attached topology plan is an operation that will execute as
> multiple parallel tasks based on the assigned parallelism. Multilevel
> operator chaining has been applied in most cases. The plan then occupies 3
> task slots, as dictated by the highest DOP.  Is this correct?
>
> What i am also unsure about is that i have keyBy transformations in my
> code, yet they are not shown as transformations in the plan. The
> corresponding section on workers and slots (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#workers-slots-resources)
> however explicitly shows keyBy in the layout.
>
> Regards
> Leon
>


Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

2016-06-08 Thread Till Rohrmann
Hi Ahmed,

the problem usually occurs, if you use differently initialized Kryo
instances where one instance has a different set of classes registered. But
your data could also be corrupted because you see an
IndexOutOfBoundsException where you try to access an element of an array
with size 0 at index 32.

In order to debug the problem it would be helpful to see the full stack
traces of the errors and the complete error message. Additionally, it would
be helpful to see your program so that we could try to reproduce the
problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader  wrote:

> Hello,
> I have a TwitterSource and I'm applying some transformations as filter and
> map on the resulting stream from twitter. I'm collecting the output in an
> iterator: iterator = DataStreamUtils.collect(datastream). Then in a
> parallel thread i periodically check if this iterator.hasNext() and print
> the next item. I'm using Flink 1.0.3.
> That program works at the beginning and actually prints some items,
> however when i leave it running for some more time (Like for example after
> 40 seconds or 1 minute) then i get 2 exceptions which are:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID
> and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
> These 2 exceptions result from the line where i'm checking if the iterator
> hasNext().
>
> I wanted to know why do these exceptions happen in general and also if
> anyone knows a specific solution for my program, that would be great too.
> Thanks,
> Ahmed
>


Re: How to perform this join operation?

2016-05-26 Thread Till Rohrmann
Hi Elias,

I like the idea of having a trailing / sliding window assigner to perform
your join. However, the result should not be entirely correct wrt your
initial join specification.

Given an events data set which contains the elements e1 = (4000, 1, 1) and
e2 = (4500, 2, 2) and a changes data set which contains the element c1 =
(4000, 1, 1). With the trailing and sliding window assigner of
SlidingEventTimeWindows(4000, 1000) and TrailingEventTimeWindows(4000,
1000), c1 would be assigned to TrailingWindow(1000, 5000). e1 and e2 would
both be amongst others in SlidingWindow(1000, 5000). Thus, the two windows
would be joined. The result would be (e1, c1) and (e2, c1). However, e2
happened after c1.

But if that is ok for your use case, then your solution is great :-)

Cheers,
Till

On Wed, May 25, 2016 at 2:04 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Elias!
>
> I think you brought up a couple of good issues. Let me try and summarize
> what we have so far:
>
> 1) Joining in a more flexible fashion
> => The problem you are solving with the trailing / sliding window
> combination: Is the right way to phrase the join problem "join records
> where key is equal and timestamps are within X seconds (millis/minutes/...)
> of each other"?
> => That should definitely have an API abstraction. The first version
> could me implemented exactly with a combination of sliding and trailing
> windows.
>
> => For joins between windowed and non windowed streams in the long
> run: Aljoscha posted the Design Doc on side inputs. Would that cover the
> use case as a long-term solution?
>
> 2) Lists that are larger than the memory
> => The ListState returns an Iterable, but it is eagerly materialized
> from RocksDB. Is there a way to "stream" the bytes from RocksDB? Flink
> could then deserialize them in a streamed fashion as well.
>
> 3) Can you elaborate a bit on the OrderedListState? Do you think of
> multiple values (ordered) per key, or a sequence of key/value pairs,
> ordered by key?
> => Currently Flink limits the scope of key accesses to the values
> current key (as defined in the keyBy() function). That way, the system can
> transparently redistribute keys when changing the parallelism.
>
> Greetings,
> Stephan
>
>
> On Sat, May 21, 2016 at 12:24 AM, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
>> Till,
>>
>> An issue with your suggestion is that the job state may grow unbounded.
>> You are managing
>> expiration of data from the cache in the operator, but the state is
>> partitioned by the stream key.
>> That means if we no longer observe a key, the state associated with that
>> key will never be
>> removed.
>>
>> In my data set keys come and go, and many will never be observed again.
>> That will lead to
>> continuous state growth over time.
>>
>>
>> On Mon, May 2, 2016 at 6:06 PM, Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>>
>>> Thanks for the suggestion.  I ended up implementing it a different way.
>>>
>>> [...]
>>>
>>> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> orry for the late reply. You're right that with the windowed join you
>>>> would have to deal with pairs where the timestamp of (x,y) is not
>>>> necessarily earlier than the timestamp of z. Moreover, by using sliding
>>>> windows you would receive duplicates as you've described. Using tumbling
>>>> windows would mean that you lose join matches if (x,y) lives in an earlier
>>>> window. Thus, in order to solve your problem you would have to write a
>>>> custom stream operator.
>>>>
>>>> The stream operator would do the following: Collecting the inputs from
>>>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>>>> Using a priority queue we order the elements because we don't know how the
>>>> arrive at the operator. Whenever we receive a watermark indicating that no
>>>> earlier events can arrive anymore, we can go through the two priority
>>>> queues to join the elements. The queues are part of the operators state so
>>>> that we don't lose information in case of a recovery.
>>>>
>>>> I've sketched such an operator here [1]. I hope this helps you to get
>>>> started.
>>>>
>>>
>>>
>>>
>>
>


Re: State key serializer has not been configured in the config.

2016-06-23 Thread Till Rohrmann
Hi Jacob,

the `ListState` abstraction is a state which we call partitioned/key-value
state. As such, it is only possible to use it with a keyed stream. This
means that you have to call `keyBy` after the `connect` API call.

Cheers,
Till

On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen  wrote:

> Hi,
>
> I am trying to use a ListState in a RichCoFlatMapFunction but when
> calling: getRuntimeContext().getListState(descriptor) in the open-function
> i am getting a "State key serializer has not .." exception. I am not sure
> what i can do to avoid this exception - Are any of you able to provide some
> help ?
>
> Best regards
> Jacob
>
>
>  private ListState> deltaPositions;
>
> @Override
> public void open(org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>   // Create state variable
>   ListStateDescriptor> descriptor =
>   new ListStateDescriptor<>(
>   "deltaPositions", // the state name
>   TypeInformation.of(new TypeHint Integer>>() {
>   }));
>
>   deltaPositions = getRuntimeContext().getListState(descriptor);
> };
>
>
>
> 2016-06-22 20:41:38,813 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Stream of Items with collection of meadian times (1/1)
> switched to FAILED with exception.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
> at
> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been
> configured in the config. This operation cannot use partitioned state.
> at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
> ... 8 more
> 2016-06-22 20:41:38,815 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Freeing ta
>
> --
> Jacob Bay Larsen
>
> Phone: +45 6133 1108
> E-mail: m...@jacobbay.dk
>


Re: Initialization of static variables

2016-06-23 Thread Till Rohrmann
Yes this is normal Flink behaviour. The reason is that static variables are
not transferred to the cluster. What happens instead when you first load
the class on the cluster is that the static variables are created and
possible class initializer are executed. That is also the reason why your
second example works whereas the first fails.

Cheers,
Till

On Thu, Jun 23, 2016 at 3:12 PM, Flavio Pompermaier 
wrote:

> Hi all,
> I've a Flink job that initialize a static Map in the main program, before
> starting any Flink transformation. If I run the job locally that variable
> is not empty, running the job on the cluster reset that variable..is it a
> bug or am I doing something wrong?
> It only works if I initialize that variable in a static statement before
> the main, that is:
>
> / KO EXAMPLE
> class ErrorMain {
>
> private static final Map ht = new HashMap<>();
>
> publis static final main(String[]args){
>  ht.put("test","test");
>  env.readFile().map(
> ...
>//here ht.get("test") returns null
>   }
> }
>
> / OK EXAMPLE
> class OkMain {
>
> private static final Map ht = new HashMap<>();
> static{
> ht.put("test","test");
> }
> publis static final main(String[] args){
>
>  env.readFile().map(
> ...
>//here ht.get("test") works
>   }
> }
>
>
> Best,
> Flavio
>
>


Re: Iterate several kafka topics using the kafka connector

2016-06-23 Thread Till Rohrmann
It is possible to instantiate the FlinkKafkaConsumer with multiple topics
[1]. Simply pass a list of topic names instead of a the name of a single
topic.

streams.add(env.addSource(new
FlinkKafkaConsumer09<>(Arrays.asList("foo", "bar", "foobar"),
new JSONSchema(), properties));

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumer

Cheers,
Till
​

On Thu, Jun 23, 2016 at 2:33 PM, Sendoh  wrote:

> Hi Flink developers,
>
> Can I ask how could we iterate several Kafka topics using the Kafka
> connector?
>
> Our idea is like the following example:
>
> List> streams = new ArrayList<>();
>
> // Iterate kafka topics
> Iterator topicIter = topicList.iterator();
>
> while (topicIter.hasNext()){
>
> String topic = topicIter.next();
>
> streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
> new JSONSchema(), properties)).rebalance());
>
> }
>
> Our goal is to union several kafka data streams into one, given the topics
> as a list:
>
> Iterator> streamsIt = streams.iterator();
>
> DataStream currentStream = streamsIt.next();
> while(streamsIt.hasNext()){
> DataStream nextStream = streamsIt.next();
> currentStream = currentStream.union(nextStream);
> }
>
> Cheers,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Stoppable Job And Web UI Questions

2016-06-23 Thread Till Rohrmann
Hi Yan Chou Chen,

   1.

   At the moment Flink sources have to implement a certain interface,
   StoppableFunction, to be stoppable. If they do, then you can stop them
   via the CLI or the web interface. This cannot be triggered from within a
   job.

   However, you have a far better way to properly terminate a Flink job
   with your custom sources. Simply terminate the SourceFunction (leaving
   the read loop) once you’ve detected that you’ve met your termination
   criterion. Once all sources have done that, the job will be properly
   terminated and go into the state FINISHED. That has the advantage that
   you reach a consensus when to terminate. Otherwise there might be a
   dictator which orders the other tasks to stop even though they might still
   have some work left to do.
   2.

   The number of records sent is the sum of all records sent by this task.
   These records include the watermarks as well as the actual stream records
   containing your data (read from Kafka). As such, this number will always be
   an upper bound for the number of actually read records (e.g. from Kafka) by
   your source.
   3.

   Given that also others might deliver messages to the same Kafka topic
   and that you have multiple partitions, I think it is not easy to know when
   your 1000 messages have been processed.

If you’re the only one who writes to this Kafka topic, you can use an
accumulator to count the number of messages sent. The accumulator is live
updated in the web ui’s tasks overview (if you click on the job and then
the tab accumulators).

input.map(new RichMapFunction() {
IntCounter intCounter = null;

@Override
public void open(Configuration config) {
intCounter = this.getRuntimeContext().getIntCounter("messages");
}
@Override
public Integer map(Integer integer) throws Exception {
intCounter.add(1);
return integer;
}
})

Cheers,
Till
​

On Wed, Jun 22, 2016 at 4:39 PM, Yan Chou Chen  wrote:

> Several new questions:
> - Stoppable job
> I read threads mentioning that a streaming job can be stopped [1][2].
> However looks like it can only be called through command line. Is it
> possible to programmatically stop the streaming job from within the
> job itself? For instance, a Kafka consumer streaming job reaches
> predefined condition, then call stop() from within e.g. MapFunction?
>
> - Web UI (jobmanager-host:8081) information
> I have a Kafka consumer which reads records from Kafka. In web ui's
> Subtasks tab where it has "Records sent", does it imply the records
> read by consumer? For instance, I deliver say 1k string record
> (SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
> displayed on web ui once all those records read by consumer?
>
> This leads to another question. I have a streaming job which exploits
> map function e.g. stream.map(new MyMapFunction). Within the
> MyMapFunction impl I count per input and write the count to external
> places. Later on I sum the count value for MyMapFunction based on
> Parallelism supplied. So for example I run map(MyMapFunction) with 4
> parallelism, MyMapFunction processes 400, 500, 400, 500 count
> respectively. Later on the sum of all count is 1800. However this sum
> value is different from web ui which has higher "Record sent" e.g. 8k.
> Does that mean "Records sent" in web ui does not mean the records
> processed by MyMapFunction? How do I interpret the value in this
> column or how can I know if all messages delivered to Kafka are fully
> processed i.e. 1k records delivered to Kafka and 1k records read out
> of Kafka?
>
> Thanks.
>
> [1].
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3c57155c30.8010...@apache.org%3E
>
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
>


Re: Localhost terminated

2016-06-23 Thread Till Rohrmann
What do the log files say?

Cheers,
Till

On Thu, Jun 23, 2016 at 4:46 PM, Debaditya Roy  wrote:

> Hello users,
>
> I have a problem with flink execution from cli. Each time I deploy from
> the CLI, the job starts but then it terminates the localhost.
>
>
>
> *06/23/2016 16:42:18Source: Custom Source -> Flat Map -> Sink:
> Unnamed(1/1) switched to SCHEDULED 06/23/2016 16:42:19Source: Custom
> Source -> Flat Map -> Sink: Unnamed(1/1) switched to DEPLOYING 06/23/2016
> 16:42:19Source: Custom Source -> Flat Map -> Sink: Unnamed(1/1)
> switched to RUNNING *
>
> and I have to abort the job (ctrl + c).
> Immediately after deployment when I check I see that the localhost has
> stopped. Looking forward for inputs. Thanks in advance.
>
> Warm Regards,
> Debaditya
>


Re: dataset dataframe join

2016-06-16 Thread Till Rohrmann
Hi Vishnu,

currently the only way to do this, is to persist the DataSet (e.g. writing
to a file) and then reading from the persisted form (e.g. file) in the open
method of a rich function in the DataStream program. That way you can keep
the data in your operator and then join with incoming stream records.

Cheers,
Till

On Wed, Jun 15, 2016 at 11:43 PM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Hi All,
>
> Is there any workaround/hack to join a dataset with datastream since
> https://issues.apache.org/jira/browse/FLINK-2320 is still in progress.
>
> Regards,
> Vishnu
>


Re: Yarn batch not working with standalone yarn job manager once a persistent, HA job manager is launched ?

2016-06-16 Thread Till Rohrmann
Hi Arnaud,

at the moment the environment variable is the only way to specify a
different config directory for the CLIFrontend. But it totally makes sense
to introduce a --configDir parameter for the flink shell script. I'll open
an issue for this.

Cheers,
Till

On Thu, Jun 16, 2016 at 5:36 PM, LINZ, Arnaud 
wrote:

> Okay, is there a way to specify the flink-conf.yaml to use on the
> ./bin/flink command-line? I see no such option. I guess I have to set
> FLINK_CONF_DIR before the call ?
>
> -Message d'origine-
> De : Maximilian Michels [mailto:m...@apache.org]
> Envoyé : mercredi 15 juin 2016 18:06
> À : user@flink.apache.org
> Objet : Re: Yarn batch not working with standalone yarn job manager once a
> persistent, HA job manager is launched ?
>
> Hi Arnaud,
>
> One issue per thread please. That makes things a lot easier for us :)
>
> Something positive first: We are reworking the resuming of existing Flink
> Yarn applications. It'll be much easier to resume a cluster using simply
> the Yarn ID or re-discoering the Yarn session using the properties file.
>
> The dynamic properties are a shortcut to modifying the Flink configuration
> of the cluster _only_ upon startup. Afterwards, they are already set at the
> containers. We might change this for the 1.1.0 release. It should work if
> you put "yarn.properties-file.location:
> /custom/location" in your flink-conf.yaml before you execute "./bin/flink".
>
> Cheers,
> Max
>
> On Wed, Jun 15, 2016 at 3:14 PM, LINZ, Arnaud 
> wrote:
> > Ooopsss
> > My mistake, snapshot/restore do works in a local env, I've had a weird
> configuration issue!
> >
> > But I still have the property  file path issue  :)
> >
> > -Message d'origine-
> > De : LINZ, Arnaud
> > Envoyé : mercredi 15 juin 2016 14:35
> > À : 'user@flink.apache.org'  Objet : RE: Yarn
> > batch not working with standalone yarn job manager once a persistent, HA
> job manager is launched ?
> >
> > Hi,
> >
> > I haven't had the time to investigate the bad configuration file path
> issue yet (if you have any idea why yarn.properties-file.location is
> ignored you are welcome) , but I'm facing another HA-problem.
> >
> > I'm trying to make my custom streaming sources HA compliant by
> implementing snapshotState() & restoreState().  I would like to test that
> mechanism in my junit tests, because it can be complex, but I was unable to
> simulate a "recover" on a local flink environment: snapshotState() is never
> triggered and launching an exception inside the execution chain does not
> lead to recovery but ends the execution, despite the
> streamExecEnv.enableCheckpointing(timeout) call.
> >
> > Is there a way to locally test this mechanism (other than poorly
> simulating it by explicitly calling snapshot & restore in a overridden
> source) ?
> >
> > Thanks,
> > Arnaud
> >
> > -Message d'origine-
> > De : LINZ, Arnaud
> > Envoyé : lundi 6 juin 2016 17:53
> > À : user@flink.apache.org
> > Objet : RE: Yarn batch not working with standalone yarn job manager once
> a persistent, HA job manager is launched ?
> >
> > I've deleted the '/tmp/.yarn-properties-user' file created for the
> persistent containter, and the batches do go into their own right
> container. However, that's not a workable workaround as I'm no longer able
> to submit streaming apps in the persistant container that way :) So it's
> really a problem of flink finding the right property file.
> >
> > I've added -yD yarn.properties-file.location=/tmp/flink/batch inside the
> batch command line (also configured in the JVM_ARGS var), with no change of
> behaviour. Note that I do have a standalone yarn container created, but the
> job is submitted in the other other one.
> >
> >  Thanks,
> > Arnaud
> >
> > -Message d'origine-
> > De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : lundi 6 juin 2016
> 16:01 À : user@flink.apache.org Objet : Re: Yarn batch not working with
> standalone yarn job manager once a persistent, HA job manager is launched ?
> >
> > Thanks for clarification. I think it might be related to the YARN
> properties file, which is still being used for the batch jobs. Can you try
> to delete it between submissions as a temporary workaround to check whether
> it's related?
> >
> > – Ufuk
> >
> > On Mon, Jun 6, 2016 at 3:18 PM, LINZ, Arnaud 
> wrote:
> >> Hi,
> >>
> >> The zookeeper path is only for my persistent container, and I do use a
> different one for all my persistent containers.
> >>
> >> The -Drecovery.mode=standalone was passed inside theJVM_ARGS
> ("${JVM_ARGS} -Drecovery.mode=standalone
> -Dyarn.properties-file.location=/tmp/flink/batch")
> >>
> >> I've tried using -yD recovery.mode=standalone on the flink command line
> too, but it does not solve the pb; it stills use the pre-existing container.
> >>
> >> Complete line =
> >> /usr/lib/flink/bin/flink run -m yarn-cluster -yn 48 -ytm 8192 -yqu

Re: How MapFunction gets executed?

2016-06-16 Thread Till Rohrmann
Hi Yan Chou Chen,

Flink does not instantiate for each record a mapper. Instead, it will
create as many mappers as you've defined with the parallelism. Each of
these mappers is deployed to a slot on a TaskManager. When it is deployed
and before it receives records, the open method is called once. Then
incoming records are processed as they arrive at the operator. Once the
operator has finished processing (in the streaming case, this means that
the user has stopped or cancelled the job) it will call the close method.
The close method should also be called if your job fails. Therefore, I
cannot explain why some of your resources don't get closed. Could you check
whether the logs contains something suspicious.

Cheers,
Till

On Thu, Jun 16, 2016 at 5:07 PM, Yan Chou Chen  wrote:

> A quick question. When running a stream job that executes
> DataStream.map(MapFunction) , after data is read from Kafka, does each
> MapFunction is created per item or based on parallelism?
>
> For instance, for the following code snippet
>
> val env = StreamExecutionEnvironment.getExeutionEnvironment
> val stream = env.addSource(FlinkKafkaConsumer09(...))
> stream.map(new RichMapFunction[String, Unit] {
>
> // my AsyncHttpClient instance
>
> override def open(params: Configuration) { /* create my
> AsyncHttpClient instance, etc. */ }
>
> override def close() { /* close my AsyncHttpClient instance*/ }
>
> override def map(record: String) {
> // my code
> }
> })
>
> Is RichMapFunction created for each record (as String in the above
> example)? Or say the program set parallelism to 4 so 4 RichMapFunction
> instances are created first, then data read from Kafka consumer is
> divided into 4 partitions (or something similar), and then map(record:
> String) is called within something like while loop? Or what is the
> actual flow? Or source code I can start from (I trace through
> StreamExecutionEnvironment/ addSource/ DataStream/ transform/
> addOperator etc., but I then get lost in source code)?
>
> Basically my problem is I have an AsyncHttpClient instance opened
> within open() function and close in close function according to the
> RichMapFunction doc. However, an issue is that in some cases my
> AsyncHttpClient instance is not executed which displays warning like
>
> AsyncHttpClient.close() hasn't been invoked, which may produce file
> descriptor leaks
>
> Therefore I would like to know the life cycle so that I can close
> resource appropriately.
>
> Thanks
>


  1   2   3   4   5   6   7   8   9   10   >