Re: GC on taskmanagers

2015-03-31 Thread Maximilian Michels
Hi Emmanuel,

In Java, the garbage collector will always run periodically. So remotely
executing it won't make any difference.

If you want to reuse the existing Java process without restarting it, you
have to stop the program code from executing which is causing the
OutOfMemoryError. Usually, this is quite tricky because your program might
not even accept input any more because it is constantly occupied with the
garbage collection.

Where was the OutOfMemoryError thrown? Do you have the stack trace of the
error? From the task manager stack trace, it actually looks like your
program is not executing any more. I would try executing a demo program
(e.g. WordCount) to check your setup.

Best regards,
Max

On Tue, Mar 31, 2015 at 5:44 AM, Emmanuel ele...@msn.com wrote:

 My Java is still rusty and I often run into OutOfMemoryError: GC overhead
 exceeded...

 Yes, I need to look for memory leaks...

 But first I need to clear up this memory so I can run again without having
 to shut down and restart everything.

 I've tried using the jcmd pid GC.run command on eachof the JVM
 instances on a taskmanager but I get a boat load of output like this:

 On the host running the command:
 com.sun.tools.attach.AttachNotSupportedException: Unable to open socket
 file: target process not responding or HotSpot VM not loaded
 at
 sun.tools.attach.LinuxVirtualMachine.init(LinuxVirtualMachine.java:106)
 at
 sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
 at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:213)
 at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:140)
 at sun.tools.jcmd.JCmd.main(JCmd.java:129)



 and on the taskmanager log:

 Flink-IPC Server handler 1 on 6121 daemon prio=10 tid=0x7f5f107ee000
 nid=0x8f waiting on condition [0x7f5eb4803000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0xf37e95c0 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
 at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:941)

 Flink-IPC Server handler 0 on 6121 daemon prio=10 tid=0x7f5f107eb800
 nid=0x8e waiting on condition [0x7f5eb4904000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0xf37e95c0 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
 at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:941)

 Flink-IPC Server listener on 6121 daemon prio=10 tid=0x7f5f107e9800
 nid=0x8d runnable [0x7f5eb4a05000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0xf385d3c0 (a sun.nio.ch.Util$2)
 - locked 0xf385d3d0 (a java.util.Collections$UnmodifiableSet)
 - locked 0xf385d378 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:102)
 at org.apache.flink.runtime.ipc.Server$Listener.run(Server.java:341)

 Flink-IPC Server Responder daemon prio=10 tid=0x7f5f107e8800
 nid=0x8c runnable [0x7f5eb4b06000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0xf387b528 (a sun.nio.ch.Util$2)
 - locked 0xf387b538 (a java.util.Collections$UnmodifiableSet)
 - locked 0xf387b4e0 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at org.apache.flink.runtime.ipc.Server$Responder.run(Server.java:506)

 Service Thread daemon prio=10 tid=0x7f5f100c2000 nid=0x8a runnable
 [0x]
java.lang.Thread.State: RUNNABLE

 C2 CompilerThread1 daemon prio=10 tid=0x7f5f100c nid=0x89
 waiting on condition [0x]
java.lang.Thread.State: RUNNABLE

 C2 CompilerThread0 daemon prio=10 tid=0x7f5f100bd000 nid=0x88
 waiting on condition [0x]
java.lang.Thread.State: RUNNABLE

 Signal Dispatcher daemon 

Re: Parallelism question

2015-04-14 Thread Maximilian Michels
Hi Giacomo,

If I understand you correctly, you want your Flink job to execute with a
parallelism of 5. Just call setDegreeOfParallelism(5) on your
ExecutionEnvironment. That way, all operations, when possible, will be
performed using 5 parallel instances. This is also true for the DataSink
which will produce 5 files containing the output data from the parallel
instances.

Best,
Max


On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari giacomo.lic...@gmail.com
wrote:

 Hi guys,
 I have a question about how parallelism works.

 If I have a large dataset and I would divide it into 5 blocks, can I pass
 each block of data to a fixed parallel process (for example I set up 5
 process) ?

 And if the results data from each process arrive to the output not in an
 ordered way, can I order them? For example:

 data from process 1
 data from process 2
 and so on

 Thank you guys!



Re: Parallelism question

2015-04-14 Thread Maximilian Michels
Hi Giacomo,

If you use a FileOutputFormat as a DataSink (e.g. as in
env.writeAsText(/path), then the output directory will contain 5 files
named 1, 2, 3, 4, and 5, each containing the output of the corresponding
task. The order of the data in the files follows the order of the
distributed DataSet. You can locally sort a partition by a key using
sortPartition(..) command. This is only available in 0.9.0-milestone-1 and
0.9-snapshot.

Best,
Max



On Tue, Apr 14, 2015 at 12:12 PM, Giacomo Licari giacomo.lic...@gmail.com
wrote:

 Hi Max,
 thank you for your reply.

 DataSink contains data ordered, I mean, it contains in order output1,
 output1 ... output5? Or are them mixed?

 Thanks a lot,
 Giacomo

 On Tue, Apr 14, 2015 at 11:58 AM, Maximilian Michels m...@apache.org
 wrote:

 Hi Giacomo,

 If I understand you correctly, you want your Flink job to execute with a
 parallelism of 5. Just call setDegreeOfParallelism(5) on your
 ExecutionEnvironment. That way, all operations, when possible, will be
 performed using 5 parallel instances. This is also true for the DataSink
 which will produce 5 files containing the output data from the parallel
 instances.

 Best,
 Max


 On Tue, Apr 14, 2015 at 10:38 AM, Giacomo Licari 
 giacomo.lic...@gmail.com wrote:

 Hi guys,
 I have a question about how parallelism works.

 If I have a large dataset and I would divide it into 5 blocks, can I
 pass each block of data to a fixed parallel process (for example I set up 5
 process) ?

 And if the results data from each process arrive to the output not in an
 ordered way, can I order them? For example:

 data from process 1
 data from process 2
 and so on

 Thank you guys!






Re: Get DataSet sum

2015-04-28 Thread Maximilian Michels
Hi Giacomo,

If you have your data stored in a Tuple inside a DataSet, then a call to
dataSet.sum(int field) should do it.

See Aggregation under
http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#transformations

Best,
Max

On Tue, Apr 28, 2015 at 2:52 PM, Giacomo Licari giacomo.lic...@gmail.com
wrote:

 Hi Guys,
 how can obtain the sum of all items (integer or double) in a DataSet?

 Do I have to use Flink Iterators? And how?

 Thank you,
 Giacomo



Re: Broken Links in Documentation

2015-05-02 Thread Maximilian Michels
This is because of recent changes to the documentation layout and
structure. The programming guide is now located at
http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html

Best,
Max

On Fri, May 1, 2015 at 8:42 AM, sirinath sirinath19...@gmail.com wrote:

 I have come across broken links in the documentation. E.g.

 http://ci.apache.org/projects/flink/flink-docs-master/programming_guide.html



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Broken-Links-in-Documentation-tp1159.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: JobTimeoutException: Lost connection to JobManager

2015-04-15 Thread Maximilian Michels
The exception indicates that you're still using the old version. It takes
some time for the new Maven artifact to get deployed to the snapshot
repository. Apparently, a artifact has already been deployed this morning.
Did you delete the jar files in your .m2 folder?

On Wed, Apr 15, 2015 at 1:38 PM, Mohamed Nadjib MAMI m...@iai.uni-bonn.de
wrote:

  Hello,

 I'm still facing the problem with 0.9-SNAPSHOT version. Tried to remove
 the libraries and download them again but same issue.

 Greetings,
 Mohamed


 Exception in thread main
 org.apache.flink.runtime.client.JobTimeoutException: Lost connection to
 JobManager
 at
 org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:164)
 at
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:198)
 at
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:188)
 at
 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:179)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
 at Main.main(Main.java:142)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [10 milliseconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at scala.concurrent.Await.result(package.scala)
 at
 org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:143)
 ... 5 more


 On 15.04.2015 01:02, Stephan Ewen wrote:

 I pushed a fix to the master. The problem should now be gone.

  Please let us know if you experience other issues!

  Greetings,
 Stephan


 On Tue, Apr 14, 2015 at 9:57 PM, Mohamed Nadjib MAMI m...@iai.uni-bonn.de
  wrote:

  Hello,

 Once I got the message, few seconds, I received your email. Well, this
 just to cast a need for a fix.

 Happy to feel the dynamism of the work. Great work.


 On 14.04.2015 21:50, Stephan Ewen wrote:

 You are on the latest snapshot version? I think there is an inconsistency
 in there. Will try to fix that toning.

 Can you actually use the milestone1 version? That one should be good.

 Greetings,
 Stephan
  Am 14.04.2015 20:31 schrieb Fotis P fotis...@gmail.com:

Hello everyone,

  I am getting this weird exception while running some simple counting
 jobs in Flink.

 Exception in thread main
 org.apache.flink.runtime.client.JobTimeoutException: Lost connection to
 JobManager
 at
 org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:164)
 at
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:198)
 at
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:188)
 at
 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:179)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
 at
 trackers.preprocessing.ExtractInfoFromLogs.main(ExtractInfoFromLogs.java:133)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [10 milliseconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at scala.concurrent.Await.result(package.scala)
 at
 org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:143)
 ... 10 more


  The only call above which comes from my code is
 ExtractInfoFromLogs.java:133 which is the environment.execute() method.

  This exception comes when dealing with largish files (10GB). No
 exception is thrown when I am working with a smaller subset of my data.
  Also I would swear that it was working fine until a few days ago, and
 the code has not been changed :S Only change was a re-import of maven
 dependencies.

  I am unsure what other information I could provide that would help you
 help me :)

  I am running everything locally through the intelij IDE. Maven
 dependency is set to 0.9-SNAPSHOT.
  I have an 8-core Ubuntu 14.04 machine.

  Thanks in advance :D


Re: Left outer join

2015-04-16 Thread Maximilian Michels

 This is something that we need to solve a bit differently.
 Maybe by adding optional null-valued field support to Tuple.


+1

That was just a proof of concept. I agree, for a proper implementation, one
would need to differentiate between a regular element and a NULL element.

On Thu, Apr 16, 2015 at 3:23 PM, Fabian Hueske fhue...@gmail.com wrote:

 That solution works if you can define a NULL_ELEMENT but not if you want
 to use the full value range of Integer.

 This is something that we need to solve a bit differently.
 Maybe by adding optional null-valued field support to Tuple.


 2015-04-15 5:59 GMT-05:00 Maximilian Michels m...@apache.org:

 Hi Flavio,

 Here's an simple example of a Left Outer Join:
 https://gist.github.com/mxm/c2e9c459a9d82c18d789

 As Stephan pointed out, this can be very easily modified to construct a
 Right Outer Join (just exchange leftElements and rightElements in the two
 loops).

 Here's an excerpt with the most important part, the coGroup function:

 public static class LeftOuterJoin implements CoGroupFunctionTuple2Integer, 
 String, Tuple2Integer, String, Tuple2Integer, Integer {

@Override
public void coGroup(IterableTuple2Integer, String leftElements,
IterableTuple2Integer, String rightElements,
CollectorTuple2Integer, Integer out) throws 
 Exception {

   final int NULL_ELEMENT = -1;

   for (Tuple2Integer, String leftElem : leftElements) {
  boolean hadElements = false;
  for (Tuple2Integer, String rightElem : rightElements) {
 out.collect(new Tuple2Integer, Integer(leftElem.f0, 
 rightElem.f0));
 hadElements = true;
  }
  if (!hadElements) {
 out.collect(new Tuple2Integer, Integer(leftElem.f0, 
 NULL_ELEMENT));
  }
   }

}
 }



 On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen se...@apache.org wrote:

 I think this may be a great example to add as a utility function.

 Or actually add as an function to the DataSet, internally realized as a
 special case of coGroup.

 We do not have a ready example of that, but it should be straightforward
 to realize. Similar as for the join, coGroup on the join keys. Inside the
 coGroup function, emit the combination of all values from the two
 iterators. If one of them is empty (the one that is not outer) then emit
 all values from the outer side.

 Greetings,
 Stephan


 On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 Do you have an already working example of it? :)


 On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi u...@apache.org wrote:


 On 15 Apr 2015, at 10:30, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 
  Hi to all,
  I have to join two datasets but I'd like to keep all data in the
 left also if there' no right dataset.
  How can you achieve that in Flink? maybe I should use coGroup?

 Yes, currently you have to implement this manually with a coGroup








Re: distinct() Java API

2015-04-17 Thread Maximilian Michels
Hi Flavio,

Do you have an exapmple? The DistinctOperator should return a typed output
just like all the other operators do.

Best,
Max

On Fri, Apr 17, 2015 at 10:07 AM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Hi guys,

 I'm trying to make (in Java) a project().distinct() but then I cannot
 create the generated dataset with a typed tuple because the distinct
 operator returns just an untyped Tuple.
 Is this an error in the APIs or am I doing something wrong?

 Best,
 Flavio



Re: Left outer join

2015-04-15 Thread Maximilian Michels
Hi Flavio,

Here's an simple example of a Left Outer Join:
https://gist.github.com/mxm/c2e9c459a9d82c18d789

As Stephan pointed out, this can be very easily modified to construct a
Right Outer Join (just exchange leftElements and rightElements in the two
loops).

Here's an excerpt with the most important part, the coGroup function:

public static class LeftOuterJoin implements
CoGroupFunctionTuple2Integer, String, Tuple2Integer, String,
Tuple2Integer, Integer {

   @Override
   public void coGroup(IterableTuple2Integer, String leftElements,
   IterableTuple2Integer, String rightElements,
   CollectorTuple2Integer, Integer out) throws
Exception {

  final int NULL_ELEMENT = -1;

  for (Tuple2Integer, String leftElem : leftElements) {
 boolean hadElements = false;
 for (Tuple2Integer, String rightElem : rightElements) {
out.collect(new Tuple2Integer, Integer(leftElem.f0,
rightElem.f0));
hadElements = true;
 }
 if (!hadElements) {
out.collect(new Tuple2Integer, Integer(leftElem.f0,
NULL_ELEMENT));
 }
  }

   }
}



On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen se...@apache.org wrote:

 I think this may be a great example to add as a utility function.

 Or actually add as an function to the DataSet, internally realized as a
 special case of coGroup.

 We do not have a ready example of that, but it should be straightforward
 to realize. Similar as for the join, coGroup on the join keys. Inside the
 coGroup function, emit the combination of all values from the two
 iterators. If one of them is empty (the one that is not outer) then emit
 all values from the outer side.

 Greetings,
 Stephan


 On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier pomperma...@okkam.it
  wrote:

 Do you have an already working example of it? :)


 On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi u...@apache.org wrote:


 On 15 Apr 2015, at 10:30, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 
  Hi to all,
  I have to join two datasets but I'd like to keep all data in the left
 also if there' no right dataset.
  How can you achieve that in Flink? maybe I should use coGroup?

 Yes, currently you have to implement this manually with a coGroup






Re: Flink Forward 2015

2015-04-08 Thread Maximilian Michels
Saved the date! This sounds very exciting. Looking forward to hearing a lot
of nice talks and meeting a lot of great people!

On Tue, Apr 7, 2015 at 2:24 PM, Kostas Tzoumas ktzou...@apache.org wrote:

 Hi everyone,

 The folks at data Artisans and the Berlin Big Data Center are organizing
 the first physical conference all about Apache Flink in Berlin the coming
 October:

 http://flink-forward.org

 The conference will be held in a beautiful spot an old brewery turned event
 space (the same space that Berlin Buzzwords took place last year). We are
 soliciting technical talks on Flink, talks on how you are using Flink to
 solve real world problems, as well as talks on Big Data technology in
 general that relate to Apache Flink's general direction. And of course,
 there will be enough social and networking events to get the community
 together :-)

 The website and the call for abstracts are live, but the ticket
 registration is not yet open.

 At this point, I would like to ask the community to mark your calendars if
 you'd like to attend, submit an abstract, and forward the event to your
 friends and family. If you can help us market the event, help in any other
 way, or have any other inquiries, please get in touch with me!

 I will also announce this via our social media channels this week.

 I am looking forward to gathering the community in a great conference!

 Best,
 Kostas



Re: Flink meetup group in Stockholm

2015-04-08 Thread Maximilian Michels
Love the purple. Have fun! :)

On Wed, Apr 8, 2015 at 5:05 PM, Henry Saputra henry.sapu...@gmail.com
wrote:

 Nice, congrats!

 On Wed, Apr 8, 2015 at 7:39 AM, Gyula Fóra gyf...@apache.org wrote:
  Hey Everyone!
 
  We our proud to announce the first Apache Flink meetup group in
 Stockholm.
 
  Join us at http://www.meetup.com/Apache-Flink-Stockholm/
 
  We are looking forward to organise our first event in May!
 
  Cheers,
  Gyula



Re: Best wishes for Kostas Tzoumas and Robert Metzger

2015-06-08 Thread Maximilian Michels
Thank you for your kind wishes :) Good luck from me as well!

I was just wondering, is it possible to stream the talks or watch them
later on?

On Mon, Jun 8, 2015 at 2:54 AM, Hawin Jiang hawin.ji...@gmail.com wrote:

 Hi All



 As you know that Kostas Tzoumas and Robert Metzger will give us two Flink
 talks on 2015 Hadoop summit.

 That is an excellent opportunity to introduce Apache Flink to the world.

 Best wishes for Kostas Tzoumas and Robert Metzger.





 Here is the details info:



 Topic: Apache Flink deep-dive

 Time: 1:45pm - 2:25pm 2015/06/10

 Speakers: Kostas Tzoumas and Robert Metzger



 Topic: Flexible and Real-time Stream Processing with Apache Flink

 Time: 3:10pm - 3:50pm 2015/06/11

 Speakers: Kostas Tzoumas and Robert Metzger











 Best regards

 Hawin



Re: Job Statistics

2015-06-18 Thread Maximilian Michels
Hi Jean,

I think it would be a nice to have feature to display some metrics on the
command line after a job has completed. We already have the run time and
the accumulator results available at the CLI and printing those would be
easy. What metrics in particular are you looking for?

Best,
Max

On Thu, Jun 18, 2015 at 3:41 PM, Jean Bez jeanluca...@gmail.com wrote:

 Hi Fabian,

 I am trying to compare some examples on Hadoop, Spark and Flink. If
 possible I would like to see the job statistics like the report given by
 Hadoop. Since I am running these examples on a large cluster it would be
 much better if I could obtain such data directly from the console.

 Thanks!
 Jean
 Em 18/06/2015 04:55, Fabian Hueske fhue...@gmail.com escreveu:

 Hi Jean,

 what kind of job execution stats are you interested in?

 Cheers, Fabian

 2015-06-18 9:01 GMT+02:00 Matthias J. Sax mj...@informatik.hu-berlin.de
 :

 Hi,

 the CLI cannot show any job statistics. However, you can use the
 JobManager web interface that is accessible at port 8081 from a browser.

 -Matthias


 On 06/17/2015 10:13 PM, Jean Bez wrote:
  Hello,
 
  Is it possible to view job statistics after it finished to execute
  directly in the command line? If so, could you please explain how? I
  could not find any mentions about this in the docs. I also tried to set
  the logs to debug mode, but no other information was presented.
 
  Thank you!
 
  Regards,
  Jean





Re: Building Yarn-Version of Flink

2015-06-23 Thread Maximilian Michels
Hi Max!

Nowadays, the default target when building from source is Hadoop 2. So a
simple mvn clean package -DskipTests should do it. You only need the flag
when you build for Hadoop 1: -Dhadoop.profile=1.

Cheers,
The other Max

On Tue, Jun 23, 2015 at 2:03 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 I just tried to build the current yarn version of Flink. The second error
 is probably a because maven is of an older version. But the first one seems
 to be an error.

 albermax@hadoop1:~/bumpboost/working/programs/flink/incubator-flink$ mvn
 clean package -DskipTests -Dhadoop.profile=2
 [INFO] Scanning for projects...
 [ERROR] The build could not read 2 projects - [Help 1]
 [ERROR]
 [ERROR]   The project org.apache.flink:flink-shaded-hadoop:0.10-SNAPSHOT
 (/home/albermax/bumpboost/working/programs/flink/incubator-flink/flink-shaded-hadoop/pom.xml)
 has 1 error
 [ERROR] Child module
 /home/albermax/bumpboost/working/programs/flink/incubator-flink/flink-shaded-hadoop/error
 of
 /home/albermax/bumpboost/working/programs/flink/incubator-flink/flink-shaded-hadoop/pom.xml
 does not exist
 [ERROR]
 [ERROR]   The project org.apache.flink:flink-hbase:0.10-SNAPSHOT
 (/home/albermax/bumpboost/working/programs/flink/incubator-flink/flink-staging/flink-hbase/pom.xml)
 has 1 error
 [ERROR] 'dependencies.dependency.version' for
 org.apache.hbase:hbase-server:jar must be a valid version but is
 '${hbase.version}'. @ line 97, column 13
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with the
 -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException


 Thanks!
 Cheers,
 Max



Re: Using collect and accessing accumulator results

2015-06-18 Thread Maximilian Michels
Hi Tamara!

Yes, there is. Since count/collect/print trigger an execution of the
ExecutionEnvironment, you can get the result afterwards using
env.getLastExecutionResult().

Best,
Max

On Thu, Jun 18, 2015 at 3:57 PM, Tamara Mendt tammyme...@gmail.com wrote:

 Hey!

 I am currently running a job in which I wish to use collect to trigger my
 job execution, but I also need to have access to the final accumulator
 results. Up until now I have been accessing the accumulator results through
 the JobExecutionResult that the function execute() returns.

 Not surprisingly, if I use collect() and execute() I get the following
 exception:

 java.lang.RuntimeException: No new data sinks have been defined since the
 last execution. The last execution refers to the latest call to
 'execute()', 'count()', 'collect()', or 'print()'.
 at
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:921)
 at
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:904)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:50)


 Is there some way in which I can access the accumulator results while
 using the collect method to trigger execution?

 Cheers,



Re: Documentation Error

2015-06-25 Thread Maximilian Michels
Thanks for noticing, Chiwan. I have the feeling this problem arose when the
website was updated. The problem about linking documentation pages from the
main website is that it is currently hard to go back to the main web site
from the documentation (the nav and URL changes). However, now we are
suffering from fragmentation.

I would suggest to move the FAQ and How To Contribute to the Flink web site
and delete them from the Flink repository.

Cheers,
Max

On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote:

 How to contribute, and coding guidelines are also duplicated on the web
 site and the documentation.
 I think this duplication is not needed. We need to merge the duplication.

 Regards,
 Chiwan Park

  On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote:
 
  Thanks. Fixed. Actually, that one is not linked anywhere, right? Just
 realized the FAQ page is duplicated on the web site and the Flink
 documentation. So there is
 http://ci.apache.org/projects/flink/flink-docs-master/faq.html and
 http://flink.apache.org/faq.html
 
  I'm guessing somebody wanted a FAQ independent of the documentation
 version. However, I don't see how we will maintain multiple FAQs. The two
 have already diverged quite a bit and merging them is not trivial.
 
  On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:
  Another one: on
  http://ci.apache.org/projects/flink/flink-docs-master/faq.html
  in the What is parallelism? How do I set it? Section the links are
 broken.
 
  Cheers,
  Max
 
  On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org
 wrote:
  Hi Max,
 
  Thanks for noticing! Fixed on the master and for the 0.9.1 release.
 
  Cheers,
  Max
 
  On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:
  Hi Flinksters,
 
  just some minor:
 
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html
  in the second code sample should be
  ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar
  instead of:
  ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar
 
  Cheers,
  Max
 
 
 







Re: Documentation Error

2015-06-25 Thread Maximilian Michels
Thanks Max. I think the documentation has grown a lot and needs an
overhaul. We should remove the unnecessary non-Flink-related stuff (e.g.
configuring ssh keys in the setup guide). I like your idea of having an
essential guide that just covers the basics for people already familiar
with other big data projects. It would be great if somebody could spare
time to work on that.

On Thu, Jun 25, 2015 at 2:31 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Something different. I just read through the Spark documentation and
 yours. While the Spark one is quite unstructured and easy to understand,
 yours is structured and really detailed. It's great that you have that in
 depth documentation, but I would recommend you to make a boiled-down page
 with just the basic stuff. Which would ease the life of beginners.

 Cheers,
 Max

 On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org
 wrote:

 How to contribute, and coding guidelines are also duplicated on the web
 site and the documentation.
 I think this duplication is not needed. We need to merge the duplication.

 Regards,
 Chiwan Park

  On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote:
 
  Thanks. Fixed. Actually, that one is not linked anywhere, right? Just
 realized the FAQ page is duplicated on the web site and the Flink
 documentation. So there is
 http://ci.apache.org/projects/flink/flink-docs-master/faq.html and
 http://flink.apache.org/faq.html
 
  I'm guessing somebody wanted a FAQ independent of the documentation
 version. However, I don't see how we will maintain multiple FAQs. The two
 have already diverged quite a bit and merging them is not trivial.
 
  On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:
  Another one: on
  http://ci.apache.org/projects/flink/flink-docs-master/faq.html
  in the What is parallelism? How do I set it? Section the links are
 broken.
 
  Cheers,
  Max
 
  On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org
 wrote:
  Hi Max,
 
  Thanks for noticing! Fixed on the master and for the 0.9.1 release.
 
  Cheers,
  Max
 
  On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:
  Hi Flinksters,
 
  just some minor:
 
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html
  in the second code sample should be
  ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar
  instead of:
  ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar
 
  Cheers,
  Max
 
 
 








Re: Documentation Error

2015-06-24 Thread Maximilian Michels
Hi Max,

Thanks for noticing! Fixed on the master and for the 0.9.1 release.

Cheers,
Max

On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 just some minor:
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html
 in the second code sample should be

 ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar

 instead of:

 ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar


 Cheers,
 Max



[ANNOUNCE] Apache Flink 0.9.0 released

2015-06-24 Thread Maximilian Michels
The Apache Flink community is pleased to announce the availability of the
0.9.0 release.

Apache Flink is an open source platform for scalable batch and stream data
processing. Flink’s core consists of a streaming dataflow engine that
provides data distribution, communication, and fault tolerance for
distributed computations over data streams.

The release is the result of many months of hard work within the Flink
community. It contains many new features and improvements which were
previewed in the 0.9.0-milestone1 release and have been polished since
then. This is the largest Flink release so far.

Please see the blog post for the added features and improvements:

http://flink.apache.org/news/2015/06/24/announcing-apache-flink-0.9.0-release.html

Regards,
The Apache Flink community


Re: scaling question

2015-06-19 Thread Maximilian Michels
Hi Bill,

You're right. Simply increasing the task manager slots doesn't do anything.
It is correct to set the parallelism to taskManagers*slots. Simply increase
the number of network buffers in the flink-conf.yaml, e.g. to 4096. In the
future, we will configure this setting dynamically.

Let us know if your runtime decreases :)

Cheers,
Max

On Fri, Jun 19, 2015 at 4:24 PM, Bill Sparks jspa...@cray.com wrote:


Sorry for the post again. I guess I'm not understanding this…

  The question is how to scale up/increase the execution of a problem.
 What  I'm trying to do, is get the best out of the available processors for
 a given node count and compare this against spark, using KMeans.

  For spark,  one method is to increase the executors and RDD partitions
  - for Flink I can increase the number of task slots
 (taskmanager.numberOfTaskSlots). My empirical evidence suggests that just
 increasing the slots does not increase processing of the data. Is there
 something I'm missing? Much like spark with re-partitioning your datasets,
 is there an equivalent option for flink? What about the parallelism
 argument The referring document seems to be broken…

  This seems to be a dead link:
 https://github.com/apache/flink/blob/master/docs/setup/%7B%7Bsite.baseurl%7D%7D/apis/programming_guide.html#parallel-execution

  If I do increase the parallelism to be (taskManagers*slots) I hit the
 Insufficient number of network buffers…

  I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16
  and still the execution time is always around 5-6 minutes, using the
 default parallelism.

  Regards,
 Bill
  --
  Jonathan (Bill) Sparks
 Software Architecture
 Cray Inc.



Re: Building master branch is failed

2015-05-29 Thread Maximilian Michels
I can confirm that mvn clean package fails. However, Travis builds fine
after Till's fix: https://travis-ci.org/apache/flink/builds/64537794

On Fri, May 29, 2015 at 11:51 AM, Till Rohrmann trohrm...@apache.org
wrote:

 Yes, this is another error. Seems to be related to the new scala shell.

 On Fri, May 29, 2015 at 11:00 AM, Chiwan Park chiwanp...@icloud.com
 wrote:

 I fetched master branch and ran again. But I got the same error.
 It seems that the problem is related to javadoc. Till’s fix is related to
 renaming in flink-ml package.

 Regards,
 Chiwan Park


 On May 29, 2015, at 5:39 PM, Stephan Ewen se...@apache.org wrote:

 A bug sneaked in...

 I think Till just pushed a fix for that, so if you pull now, it should
 work again...



 On Fri, May 29, 2015 at 10:28 AM, Chiwan Park chiwanp...@icloud.com
 wrote:

 Hi :)

 I tried building current master branch with `mvn clean package
 -DskipTests` command.
 But I got a error following:

 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 08:11 min
 [INFO] Finished at: 2015-05-29T17:21:45+09:00
 [INFO] Final Memory: 151M/1524M
 [INFO]
 
 [ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:jar (attach-javadocs)
 on project flink-scala-shell: MavenReportException: Error while creating
 archive:
 [ERROR] Exit code: 1 - javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: error - No public or protected classes found to
 document.
 [ERROR]
 [ERROR] Command line was:
 /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/javadoc
 @options @packages
 [ERROR]
 [ERROR] Refer to the generated Javadoc files in
 '/Users/chiwanpark/IdeaProjects/flink/flink-staging/flink-scala-shell/target/apidocs'
 dir.
 [ERROR] - [Help 1]
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with the
 -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
 [ERROR]
 [ERROR] After correcting the problems, you can resume the build with the
 command
 [ERROR]   mvn goals -rf :flink-scala-shell

 I ran the command in OS X 10.10.3, Oracle JDK 1.8.0_45, Maven 3.3.1.
 How can I solve this problem?


 Regards,
 Chiwan Park









Re: Building master branch is failed

2015-05-29 Thread Maximilian Michels
Fixed it on the master.

Problem were some classes belonging to package org.apache.flink.api.java
were in the folder src/main/java/*org.apache.flink*/api/java/ instead of
src/main/java/org/apache/flink/api/java/.

On Fri, May 29, 2015 at 2:04 PM, Maximilian Michels m...@apache.org wrote:

 Yes, clearly this is related to the merge of the scala shell.

 On Fri, May 29, 2015 at 12:17 PM, Ufuk Celebi u...@apache.org wrote:

 I think it is unrelated to Till's fix, no? It's a different issue.

 Does Travis *not* detect this because of the caches?

 On Fri, May 29, 2015 at 12:03 PM, Maximilian Michels m...@apache.org
 wrote:

 I can confirm that mvn clean package fails. However, Travis builds
 fine after Till's fix:
 https://travis-ci.org/apache/flink/builds/64537794

 On Fri, May 29, 2015 at 11:51 AM, Till Rohrmann trohrm...@apache.org
 wrote:

 Yes, this is another error. Seems to be related to the new scala shell.

 On Fri, May 29, 2015 at 11:00 AM, Chiwan Park chiwanp...@icloud.com
 wrote:

 I fetched master branch and ran again. But I got the same error.
 It seems that the problem is related to javadoc. Till’s fix is related
 to renaming in flink-ml package.

 Regards,
 Chiwan Park


 On May 29, 2015, at 5:39 PM, Stephan Ewen se...@apache.org wrote:

 A bug sneaked in...

 I think Till just pushed a fix for that, so if you pull now, it should
 work again...



 On Fri, May 29, 2015 at 10:28 AM, Chiwan Park chiwanp...@icloud.com
 wrote:

 Hi :)

 I tried building current master branch with `mvn clean package
 -DskipTests` command.
 But I got a error following:

 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 08:11 min
 [INFO] Finished at: 2015-05-29T17:21:45+09:00
 [INFO] Final Memory: 151M/1524M
 [INFO]
 
 [ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:jar (attach-javadocs)
 on project flink-scala-shell: MavenReportException: Error while creating
 archive:
 [ERROR] Exit code: 1 - javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: warning - No source files for package
 org.apache.flink.api.java
 [ERROR] javadoc: error - No public or protected classes found to
 document.
 [ERROR]
 [ERROR] Command line was:
 /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/javadoc
 @options @packages
 [ERROR]
 [ERROR] Refer to the generated Javadoc files in
 '/Users/chiwanpark/IdeaProjects/flink/flink-staging/flink-scala-shell/target/apidocs'
 dir.
 [ERROR] - [Help 1]
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with
 the -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
 [ERROR]
 [ERROR] After correcting the problems, you can resume the build with
 the command
 [ERROR]   mvn goals -rf :flink-scala-shell

 I ran the command in OS X 10.10.3, Oracle JDK 1.8.0_45, Maven 3.3.1.
 How can I solve this problem?


 Regards,
 Chiwan Park












Re: Documentation Error

2015-07-01 Thread Maximilian Michels
I removed the FAQ from the main repository and merged it with the website's
version.

There is still the duplicate How to Contribute guide. It suffers from the
same sync problem.

On Tue, Jun 30, 2015 at 7:04 PM, Stephan Ewen se...@apache.org wrote:

 +1
 for moving the FAQ to the website.

 On Tue, Jun 30, 2015 at 5:09 PM, Robert Metzger rmetz...@apache.org
 wrote:

 +1
 lets remove the FAQ from the source repo and put it on the website only.

 On Thu, Jun 25, 2015 at 3:14 PM, Ufuk Celebi u...@apache.org wrote:


 On 25 Jun 2015, at 14:31, Maximilian Michels m...@apache.org wrote:

  Thanks for noticing, Chiwan. I have the feeling this problem arose
 when the website was updated. The problem about linking documentation pages
 from the main website is that it is currently hard to go back to the main
 web site from the documentation (the nav and URL changes). However, now we
 are suffering from fragmentation.
 
  I would suggest to move the FAQ and How To Contribute to the Flink web
 site and delete them from the Flink repository.

 +1






Re: Execution graph

2015-06-30 Thread Maximilian Michels
Hi Michele,

If you don't set the parallelism, the default parallelism is used. For the
visualization in the web client, a parallelism of one is used. When you run
your example from your IDE, the default parallelism is set to the number of
(virtual) cores of your CPU.

Moreover, Flink will currently not automatically set the parallelism in a
cluster environment. It will use the default parallelism or the user-set
parallelism. In your example, if you set the parallelism explicitly then it
will also show up in the visualization.

Best,
Max

On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni 
michele1.bert...@mail.polimi.it wrote:

 Hi, I was trying to run my program in the flink web environment (the local
 one)
 when I run it I get the graph of the planned execution but in each node
 there is a parallelism = 1”, instead i think it runs with par = 8 (8 core,
 i  always get 8 output)

 what does that mean?
 is that wrong or is it really running with 1 degree of par?

 just a note: I never do any setParallelism() command, i leave it
 automatical

 thanks
 Best
 Michele


Re: writeAsCsv not writing anything on HDFS when WriteMode set to OVERWRITE

2015-07-02 Thread Maximilian Michels
Hi Mihail,

Thanks for the code. I'm trying to reproduce the problem now.

On Wed, Jul 1, 2015 at 8:30 PM, Mihail Vieru vi...@informatik.hu-berlin.de
wrote:

  Hi Max,

 thank you for your reply. I wanted to revise and dismiss all other factors
 before writing back. I've attached you my code and sample input data.

 I run the *APSPNaiveJob* using the following arguments:

 *0 100 hdfs://path/to/vertices-test-100 hdfs://path/to/edges-test-100
 hdfs://path/to/tempgraph 10 0.5 hdfs://path/to/output-apsp 9*

 I was wrong, I originally thought that the first writeAsCsv call (line 50)
 doesn't work. An exception is thrown without the WriteMode.OVERWRITE when
 the file exists.

 But the problem lies with the second call (line 74), trying to write to
 the same path on HDFS.

 This issue is blocking me, because I need to persist the vertices dataset
 between iterations.

 Cheers,
 Mihail

 P.S.: I'm using the latest 0.10-SNAPSHOT and HDFS 1.2.1.



 On 30.06.2015 16:51, Maximilian Michels wrote:

   HI Mihail,

  Thank you for your question. Do you have a short example that reproduces
 the problem? It is hard to find the cause without an error message or some
 example code.

  I wonder how your loop works without WriteMode.OVERWRITE because it
 should throw an exception in this case. Or do you change the file names on
 every write?

  Cheers,
  Max

 On Tue, Jun 30, 2015 at 3:47 PM, Mihail Vieru 
 vi...@informatik.hu-berlin.de wrote:

  I think my problem is related to a loop in my job.

 Before the loop, the writeAsCsv method works fine, even in overwrite mode.

 In the loop, in the first iteration, it writes an empty folder containing
 empty files to HDFS. Even though the DataSet it is supposed to write
 contains elements.

 Needless to say, this doesn't occur in a local execution environment,
 when writing to the local file system.


 I would appreciate any input on this.

 Best,
 Mihail



 On 30.06.2015 12:10, Mihail Vieru wrote:

 Hi Till,

 thank you for your reply.

 I have the following code snippet:

 *intermediateGraph.getVertices().writeAsCsv(tempGraphOutputPath, \n,
 ;, WriteMode.OVERWRITE);*

 When I remove the WriteMode parameter, it works. So I can reason that the
 DataSet contains data elements.

 Cheers,
 Mihail


 On 30.06.2015 12:06, Till Rohrmann wrote:

  Hi Mihail,

 have you checked that the DataSet you want to write to HDFS actually
 contains data elements? You can try calling collect which retrieves the
 data to your client to see what’s in there.

 Cheers,
 Till
 ​

 On Tue, Jun 30, 2015 at 12:01 PM, Mihail Vieru 
 vi...@informatik.hu-berlin.de wrote:

 Hi,

 the writeAsCsv method is not writing anything to HDFS (version 1.2.1)
 when the WriteMode is set to OVERWRITE.
 A file is created but it's empty. And no trace of errors in the Flink or
 Hadoop logs on all nodes in the cluster.

 What could cause this issue? I really really need this feature..

 Best,
 Mihail









Re: writeAsCsv not writing anything on HDFS when WriteMode set to OVERWRITE

2015-07-02 Thread Maximilian Michels
The problem is that your input and output path are the same. Because Flink
executes in a pipelined fashion, all the operators will come up at once.
When you set WriteMode.OVERWRITE for the sink, it will delete the path
before writing anything. That means that when your DataSource reads the
input, there will be nothing to read from. Thus you get an empty DataSet
which you write to HDFS again. Any further loops will then just write
nothing.

You can circumvent this problem, by prefixing every output file with a
counter that you increment in your loop. Alternatively, if you only want to
keep the latest output, you can use two files and let them alternate to be
input and output.

Let me know if you have any further questions.

Kind regards,
Max

On Thu, Jul 2, 2015 at 10:20 AM, Maximilian Michels m...@apache.org wrote:

 Hi Mihail,

 Thanks for the code. I'm trying to reproduce the problem now.

 On Wed, Jul 1, 2015 at 8:30 PM, Mihail Vieru 
 vi...@informatik.hu-berlin.de wrote:

  Hi Max,

 thank you for your reply. I wanted to revise and dismiss all other
 factors before writing back. I've attached you my code and sample input
 data.

 I run the *APSPNaiveJob* using the following arguments:

 *0 100 hdfs://path/to/vertices-test-100 hdfs://path/to/edges-test-100
 hdfs://path/to/tempgraph 10 0.5 hdfs://path/to/output-apsp 9*

 I was wrong, I originally thought that the first writeAsCsv call (line
 50) doesn't work. An exception is thrown without the WriteMode.OVERWRITE
 when the file exists.

 But the problem lies with the second call (line 74), trying to write to
 the same path on HDFS.

 This issue is blocking me, because I need to persist the vertices dataset
 between iterations.

 Cheers,
 Mihail

 P.S.: I'm using the latest 0.10-SNAPSHOT and HDFS 1.2.1.



 On 30.06.2015 16:51, Maximilian Michels wrote:

   HI Mihail,

  Thank you for your question. Do you have a short example that reproduces
 the problem? It is hard to find the cause without an error message or some
 example code.

  I wonder how your loop works without WriteMode.OVERWRITE because it
 should throw an exception in this case. Or do you change the file names on
 every write?

  Cheers,
  Max

 On Tue, Jun 30, 2015 at 3:47 PM, Mihail Vieru 
 vi...@informatik.hu-berlin.de wrote:

  I think my problem is related to a loop in my job.

 Before the loop, the writeAsCsv method works fine, even in overwrite
 mode.

 In the loop, in the first iteration, it writes an empty folder
 containing empty files to HDFS. Even though the DataSet it is supposed to
 write contains elements.

 Needless to say, this doesn't occur in a local execution environment,
 when writing to the local file system.


 I would appreciate any input on this.

 Best,
 Mihail



 On 30.06.2015 12:10, Mihail Vieru wrote:

 Hi Till,

 thank you for your reply.

 I have the following code snippet:

 *intermediateGraph.getVertices().writeAsCsv(tempGraphOutputPath, \n,
 ;, WriteMode.OVERWRITE);*

 When I remove the WriteMode parameter, it works. So I can reason that
 the DataSet contains data elements.

 Cheers,
 Mihail


 On 30.06.2015 12:06, Till Rohrmann wrote:

  Hi Mihail,

 have you checked that the DataSet you want to write to HDFS actually
 contains data elements? You can try calling collect which retrieves the
 data to your client to see what’s in there.

 Cheers,
 Till
 ​

 On Tue, Jun 30, 2015 at 12:01 PM, Mihail Vieru 
 vi...@informatik.hu-berlin.de wrote:

 Hi,

 the writeAsCsv method is not writing anything to HDFS (version 1.2.1)
 when the WriteMode is set to OVERWRITE.
 A file is created but it's empty. And no trace of errors in the Flink
 or Hadoop logs on all nodes in the cluster.

 What could cause this issue? I really really need this feature..

 Best,
 Mihail










Re: Recursive directory reading error

2015-05-26 Thread Maximilian Michels
Yes, there is a loop to recursively search for files in directory but that
should be ok. The code fails when adding a new InputSplit to an ArrayList.
This is a standard operation.

Oh, I think I found a bug in `addNestedFiles`. It does not pick up the
length of the recursively found files in line 546. That can result in a
returned size of 0 which causes infinite InputSplits to be created and
added to the aforementioned ArrayList. Can you change

addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

to

length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

?



On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 I have 10 files..I debugged the code and it seems that there's a loop in
 the FileInputFormat when files are nested far away from the root directory
 of the scan

 On Tue, May 26, 2015 at 2:14 PM, Robert Metzger rmetz...@apache.org
 wrote:

 Hi Flavio,

 how many files are in the directory?
 You can count with find /tmp/myDir | wc -l

 Flink running out of memory while creating input splits indicates to me
 that there are a lot of files in there.

 On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier pomperma...@okkam.it
  wrote:

 Hi to all,

 I'm trying to recursively read a directory but it seems that the
  totalLength value in the FileInputformat.createInputSplits() is not
 computed correctly..

 I have a files organized as:

 /tmp/myDir/A/B/cunk-1.txt
 /tmp/myDir/A/B/cunk-2.txt
  ..

 If I try to do the following:

 Configuration parameters = new Configuration();
 parameters.setBoolean(recursive.file.enumeration, true);

 env.readTextFile(file:tmp/myDir)).withParameters(parameters).print();

 I get:

 Caused by: org.apache.flink.runtime.JobException: Creating the input
 splits caused an error: Java heap space
 at
 org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:162)
 at
 org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
 at org.apache.flink.runtime.jobmanager.JobManager.org
 $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
 ... 19 more
 Caused by: java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
 at
 org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
 at
 org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:146)

 Am I doing something wrong or is it a bug?

 Best,
 Flavio







Re: Recursive directory reading error

2015-05-26 Thread Maximilian Michels
Pushed a fix to the master and will open a PR to programmatically fix this.

On Tue, May 26, 2015 at 4:22 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Yeap, that definitively solves the problem! Could you make a PR to fix
 that..?

 Thank you in advance,
 Flavio

 On Tue, May 26, 2015 at 3:20 PM, Maximilian Michels m...@apache.org
 wrote:

 Yes, there is a loop to recursively search for files in directory but
 that should be ok. The code fails when adding a new InputSplit to an
 ArrayList. This is a standard operation.

 Oh, I think I found a bug in `addNestedFiles`. It does not pick up the
 length of the recursively found files in line 546. That can result in a
 returned size of 0 which causes infinite InputSplits to be created and
 added to the aforementioned ArrayList. Can you change

 addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

 to

 length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

 ?



 On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier pomperma...@okkam.it
  wrote:

 I have 10 files..I debugged the code and it seems that there's a loop in
 the FileInputFormat when files are nested far away from the root directory
 of the scan

 On Tue, May 26, 2015 at 2:14 PM, Robert Metzger rmetz...@apache.org
 wrote:

 Hi Flavio,

 how many files are in the directory?
 You can count with find /tmp/myDir | wc -l

 Flink running out of memory while creating input splits indicates to me
 that there are a lot of files in there.

 On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 Hi to all,

 I'm trying to recursively read a directory but it seems that the
  totalLength value in the FileInputformat.createInputSplits() is not
 computed correctly..

 I have a files organized as:

 /tmp/myDir/A/B/cunk-1.txt
 /tmp/myDir/A/B/cunk-2.txt
  ..

 If I try to do the following:

 Configuration parameters = new Configuration();
 parameters.setBoolean(recursive.file.enumeration, true);

 env.readTextFile(file:tmp/myDir)).withParameters(parameters).print();

 I get:

 Caused by: org.apache.flink.runtime.JobException: Creating the input
 splits caused an error: Java heap space
 at
 org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:162)
 at
 org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
 at org.apache.flink.runtime.jobmanager.JobManager.org
 $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
 ... 19 more
 Caused by: java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
 at
 org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
 at
 org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:146)

 Am I doing something wrong or is it a bug?

 Best,
 Flavio









Re: Read CSV Parse Quoted Strings Function

2015-08-24 Thread Maximilian Michels
Hi Tamara,

Quoted strings should not contain the quoting character. The way to work
around this is to escape the quote characters. However, currently there is
no option to escape quotes which pretty much forbids any use of quote
characters within quoted fields. This should be fixed. I opened a JIRA for
this issue: https://issues.apache.org/jira/browse/FLINK-2567

As for your idea for parsing quoted fields, I personally prefer escaping
the quoting characters. In quoted fields, Flink allows all characters
except quotes which means, we have to read the entire file to know whether
we can close a quote. Additionally, we need to keep track of how many
quotes are opened and closed.

While your proposal is a very convenient feature, I think we should rather
implement explicit quoting for performance and clarity reasons.

Cheers,
Max



On Mon, Aug 24, 2015 at 10:40 AM, Tamara Mendt tammyme...@gmail.com wrote:

 Hi all,

 When using the parseQuotedStrings function for the CsvReader class, I have
 noticed that if the caracter of the quotes is also inside of the string,
 the parsing fails.

 For example, if there is a field of this form:

 RT @sportsguy33: New Time Warner slogan: Time Warner, where we make you
 long for the days before cable.

 I think it is not so uncommon to have a case like this and it should not
 fail, but rather the string should be parsed as:

 RT @sportsguy33: New Time Warner slogan: Time Warner, where we make you
 long for the days before cable.

 I have found the part of the Flink code that raised this exception and can
 fix it, but wanted to consult first if others agree that this is an issue.

 Cheers,

 Tamara




Re: Execution graph

2015-06-30 Thread Maximilian Michels
Yes, the web client always shows parallelism 1. That is a bug but it does
not affect the execution of your program.

If you specify the default parallelism in your Flink config, you don't have
to set it in your program or via the command line argument (-p). However,
if you leave it at its default and do not set it anywhere, then it will be
1. Like you already pointed out, that won't execute your programs
distributed.

The parallelism is set in this order:

1) parallelism default set in config
2) parallelism default set through the command-line client
3) parallelism set directly in your program on the ExecutionEnvironment
using setParallelism
4) parallelism set on the operator using setParallism(...)

Each stage overrides the preceding. So 3 will override the settings of 1-2
and 4 will override the parallelism for a particular operator previously
set by 1-3.

Best,
Max

On Tue, Jun 30, 2015 at 4:48 PM, Michele Bertoni 
michele1.bert...@mail.polimi.it wrote:

  Hi everybody and thanks for the answer

  So if I understood you said that
 apart from some operation, most of them are executed at the default
 parallelism value (that is what I expected)
 but the viewer will always show 1 if something different is not set via
 setParallelism

  is it right?

  I don’t have particular need, the higher is the parallelism the better
 I am able to bin my data in more groups than the number of workers in the
 cluster, is it better to explicitly write the degree of parallelism or can
 I leave it blank (so = to default)?


  thanks
 Michele


  Il giorno 30/giu/2015, alle ore 10:41, Fabian Hueske fhue...@gmail.com
 ha scritto:

  As an addition, some operators can only be run with a parallelism of 1.
 For example data sources based on collections and (un-grouped) all reduces.
 In some cases, the parallelism of the following operators will as well be
 set to 1 to avoid a network shuffle.

  If you do:

  env.fromCollection(myCollection).map(new
 MyMapper()).groupBy(0).reduce(new MyReduce()).writeToFile();

  the data source and mapper will be run with a parallelism of 1, the
 reducer and sink will be executed with the default parallelism.

  Best, Fabian

 2015-06-30 10:25 GMT+02:00 Maximilian Michels m...@apache.org:

   Hi Michele,

  If you don't set the parallelism, the default parallelism is used. For
 the visualization in the web client, a parallelism of one is used. When you
 run your example from your IDE, the default parallelism is set to the
 number of (virtual) cores of your CPU.

  Moreover, Flink will currently not automatically set the parallelism in
 a cluster environment. It will use the default parallelism or the user-set
 parallelism. In your example, if you set the parallelism explicitly then it
 will also show up in the visualization.

  Best,
  Max

 On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni 
 michele1.bert...@mail.polimi.it wrote:

 Hi, I was trying to run my program in the flink web environment (the
 local one)
 when I run it I get the graph of the planned execution but in each node
 there is a parallelism = 1”, instead i think it runs with par = 8 (8 core,
 i  always get 8 output)

 what does that mean?
 is that wrong or is it really running with 1 degree of par?

 just a note: I never do any setParallelism() command, i leave it
 automatical

 thanks
 Best
 Michele







Re: Using Date or other types in a POJO?

2015-07-30 Thread Maximilian Michels
Hi Stefan,

The problem is that the CsvParser does not know how to parse types other
than the ones that are supported. It would be nice if it supported a custom
parser which is either manually specified or included in the PoJo class
itself.

You can either change your PoJo fields to be of a supported types (like you
already did), or read your data into a TupleString, Double, Double,..
first and convert the Tuples in a Map operation to a Pojo. In the map
operation you can specify your own parsing logic.

Best,
Max

On Thu, Jul 30, 2015 at 11:40 AM, Stefan Winterstein 
stefan.winterst...@dfki.de wrote:

 Hi,

 I'm new to Flink and just taking the first steps...

 I want to parse a CSV file that contains a date and time as the first
 field, then some values:

  07.02.201549.9871 234.677 ...

 So I’d like to use this POJO:

  import java.util.Date;
 
  public class DataPoint
  {
  private String dateStr; // String value of date
  private Date date;  // the actual date
...
 
  private static SimpleDateFormat dateFormat = new
 SimpleDateFormat(dd.MM.);
 
  public DataPoint() {}
 
  // String setter, converts to Date
  public void setDateStr(String value) {
  this.dateStr = value;
  try {
  this.date = dateFormat.parse(dateStr); // parse string and
 store date
  } catch (ParseException e) {
  e.printStackTrace();
  }
  }
 
  public String getDateStr() {
  return this.dateStr;
  }
 

  public Date getDate() {
  return this.date;
  }
  …
  }

 ...and pass it to the CSVReader:

  DataSetDataPoint csvInput = env.readCsvFile(filename)
  .pojoType(DataPoint.class, dateStr,
 ...);

 However, this fails with an exception:

  Exception in thread main java.lang.IllegalArgumentException: The type
 'java.util.Date' is not supported for the CSV input format.
at
 org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236)
at
 org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115)
at
 org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:77)
at
 org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:61)
at
 org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295)
at de.dfki.iui.MyJob.main(MyJob.java:60)

 I managed to work around this by storing the long value of
 Date.getTime() instead of Date, but:

 Does the POJO semantic really need to be that strict? Wouldn't it be
 sufficient if there was an appropriate getter/setter for the member
 names given to pojoType()?


 Best regards,

 -Stefan



Re: One improvement suggestion: “flink-xx-jobmanager-linux-3lsu.log file can't auto be recovered/detected after mistaking delete

2015-07-17 Thread Maximilian Michels
Hi Chenliang,

I've posted a comment in the associated JIRA issue:
https://issues.apache.org/jira/browse/FLINK-2367

Thanks,
Max

On Fri, Jul 17, 2015 at 8:27 AM, Chenliang (Liang, DataSight) 
chenliang...@huawei.com wrote:

  *One improvement suggestion, please check if it is valid?*



 For checking system whether be adequately reliability, testers usually
 designedly do some delete operation.

 Steps:

 1.go to flink\build-target\log

 2.delete “flink-xx-jobmanager-linux-3lsu.log file

 3.Run jobs along with writing log info, meanwhile the system didn't give
 any error info when the log info can't be wrote correctly.

 4.when some jobs be run failed , go to check log file for finding the
 reason, can't find the log file.

 Must restart Job Manager to regenerate the log file, then continue to run
 jobs.





 Regards

 Liang



Re: HBase Machine Learning

2015-07-13 Thread Maximilian Michels
Hi Lydia,

Here are some examples of how to read/write data from/to HBase:
https://github.com/apache/flink/tree/master/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example

Hope that helps you to develop your Flink job. If not feel free to ask!

Best,
Max

On Sat, Jul 11, 2015 at 8:19 PM, Lydia Ickler ickle...@googlemail.com
wrote:

 Dear Sir or Madame,

 I would like to use the Flink-HBase addon to read out data that then
 serves as an input for the machine learning algorithms, respectively the
 SVM and MLR. Right now I first write the extracted data to a temporary file
 and then read it in via the libSVM method...but i guess there should Be a
 more sophisticated way.

 Do you have a code snippet or an idea how to do so?

 Many thanks in advance and best regards,
 Lydia


Re: local web-client error

2015-07-13 Thread Maximilian Michels
Hi Michele,

Sorry to hear you are experiencing problems with the web client. Which
version of Flink are you using?

Could you paste the whole error message you see? Thank you.

Best regards,
Max

On Sun, Jul 12, 2015 at 11:21 AM, Michele Bertoni 
michele1.bert...@mail.polimi.it wrote:

 I think there is a problem with the web-client

 Quite often I can use it for a single run and then it crash
 especially if after seeing the graph i click back, on the second run i get
 a class not found exception

 from terminal i have to stop and restart it and it works again


 Michele


Re: Flink Scala performance

2015-07-16 Thread Maximilian Michels
HI Vinh,

If you run your program locally, then Flink uses the local execution mode
which allocates only little managed memory. Managed memory is used by Flink
to perform operations on serialized data. These operations can get slow if
too little memory gets allocated because data needs to be spilled to disk.
That would of course be different in a cluster environment where you
configure the memory explicitly.

When the task manager starts, it tells you how much memory it allocates.
For example, in my case:

10:12:37,655 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Using 1227
MB for Flink managed memory.

How does that look in your case?

Cheers,
Max



On Thu, Jul 16, 2015 at 8:54 AM, Vinh June hoangthevinh@gmail.com
wrote:

 I ran it on local, from terminal.
 And it's the Word Count example so it's small



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2074.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: problem with union

2015-07-15 Thread Maximilian Michels
Hi Michele,

Thanks for reporting the problem. It seems like we changed the way we
compare generic types like your GValue type. I'm debugging that now. We can
get a fix in for the 0.9.1 release.

Cheers,
Max

On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni 
michele1.bert...@mail.polimi.it wrote:

 Hi everybody, this discussion started in an other thread about a problem
 in union, but you said it was a different error then i am opening a new
 topic

 I am doing the union of two dataset and I am getting this error




 Exception in thread main
 org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of
 different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4:
 Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue),
 input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5:
 Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 at
 org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46)
 at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101)
 at
 it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125)
 ...




 Input1=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 input2=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)


 as you can see the two datasets have the same type
 this error only happens with a custom data type (e.g. i am using an array
 of GValue, an array of Int or Double works)

 in the last flink version it was working (milestone and snapshot) now in
 0.9.0 it is not

 what can it be?


 thanks for help

 cheers,
 Michele


Re: Nested Iterations Outlook

2015-07-20 Thread Maximilian Michels
 So it is up to debate how the support for resuming from intermediate
results will look like. - What's the current state of that debate?

Since there is no support for nested iterations that I know of, the debate
how intermediate results are integrated has not started yet.


 Intermediate results are not produced within the iterations cycles. -
 Ok, if there are none, what does it have to do with that debate? :-)


I was referring to the existing support for intermediate results within
iterations. If we were to implement nested iterations, this could
(possibly) change. This is all very theoretical because there are no plans
to support nested iterations.

Hope this clarifies. Otherwise, please restate your question because I
might have misunderstood.

Cheers,
Max

On Mon, Jul 20, 2015 at 12:11 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Thanks for the answer! But I need some clarification:

 So it is up to debate how the support for resuming from intermediate
 results will look like. - What's the current state of that debate?
 Intermediate results are not produced within the iterations cycles. -
 Ok, if there are none, what does it have to do with that debate? :-)

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 10:50 AM, Maximilian Michels m...@apache.org
 wrote:

 Hi Max,

 You are right, there is no support for nested iterations yet. As far as I
 know, there are no concrete plans to add support for it. So it is up to
 debate how the support for resuming from intermediate results will look
 like. Intermediate results are not produced within the iterations cycles.
 Same would be true for nested iterations. So the behavior for resuming from
 intermediate results should be alike for nested iterations.

 Cheers,
 Max

 On Fri, Jul 17, 2015 at 4:26 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 as far as I know, there is still no support for nested iterations
 planned. Am I right?

 So my question is how such use cases should be handled in the future.
 More specific: when pinning/caching will be available, you suggest to
 use that feature and program in Spark style? Or is there some other, more
 flexible, mechanism planned for loops?

 Cheers,
 Max






Re: Too few memory segments provided exception

2015-07-20 Thread Maximilian Michels
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to
provide Flink with more memory. You can either increase the
taskmanager.heap.mb config variable or set taskmanager.memory.fraction
to some value greater than 0.7 and smaller then 1.0. The first config
variable allocates more overall memory for Flink; the latter changes the
ratio between Flink managed memory (e.g. for hash join) and user memory
(for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and
you don't have control over that at the moment. You could, however, start a
local cluster (./bin/start-local) after you adjusted your flink-conf.yaml
and run your programs against that configured cluster. You can do that
either through your IDE using a RemoteEnvironment or by submitting the
packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge shgha...@gmail.com wrote:

 Hello,
  I am working on a problem which implements Adamic Adar Algorithm using
 Gelly.
 I am running into this exception for all the Joins (including the one that
 are part of the reduceOnNeighbors function)

 Too few memory segments provided. Hash Join needs at least 33 memory
 segments.


 The problem persists even when I comment out some of the joins.

 Even after using edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 as suggested by @AndraLungu the problem persists.

 The code is


 DataSetTuple2Long, Long degrees = graph.getDegrees();

 //get neighbors of each vertex in the HashSet for it's value
 computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(),
 EdgeDirection.ALL);

 //get vertices with updated values for the final Graph which will
 be used to get Adamic Edges
 Vertices = computedNeighbors.join(degrees,
 JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new
 JoinNeighborDegrees());

 GraphLong, Tuple3Double, HashSetLong, ListTuple3Long, Long,
 Double, Double updatedGraph =
 Graph.fromDataSet(Vertices, edges, env);

 //configure Vertex Centric Iteration
 VertexCentricConfiguration parameters = new
 VertexCentricConfiguration();

 parameters.setName(Find Adamic Adar Edge Weights);

 parameters.setDirection(EdgeDirection.ALL);

 //run Vertex Centric Iteration to get the Adamic Adar Edges into
 the vertex Value
 updatedGraph = updatedGraph.runVertexCentricIteration(new
 GetAdamicAdarEdgesLong(), new NeighborsMessengerLong(), 1, parameters);

 //Extract Vertices of the updated graph
 DataSetVertexLong, Tuple3Double, HashSetLong,
 ListTuple3Long, Long, Double vertices = updatedGraph.getVertices();

 //Extract the list of Edges from the vertex values
 DataSetTuple3Long, Long, Double edg = vertices.flatMap(new
 GetAdamicList());

 //Partial weights for the edges are added
 edg = edg.groupBy(0,1).reduce(new AdamGroup());

 //Graph is updated with the Adamic Adar Edges
 edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 Any idea how I could tackle this Exception?



Re: Too few memory segments provided exception

2015-07-20 Thread Maximilian Michels
Hi Shivani,

The issue is that by the time the Hash Join is executed, the
MutableHashTable cannot allocate enough memory segments. That means that
your other operators are occupying them. It is fine that this also occurs
on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the
ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge shgha...@gmail.com wrote:

 Hello Maximilian,

 Thanks for the suggestion. I will use it to check the program. But when I
 am creating a PR for the same implementation with a Test, I am getting the
 same error even on Travis build. So for that what would be the solution?

 Here is my PR https://github.com/apache/flink/pull/923
 And here is the Travis build status
 https://travis-ci.org/apache/flink/builds/71695078

 Also on the IDE it is working fine in Collection execution mode.

 Thanks and Regards,
 Shivani

 On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels m...@apache.org
 wrote:

 Hi Shivani,

 Flink doesn't have enough memory to perform a hash join. You need to
 provide Flink with more memory. You can either increase the
 taskmanager.heap.mb config variable or set taskmanager.memory.fraction
 to some value greater than 0.7 and smaller then 1.0. The first config
 variable allocates more overall memory for Flink; the latter changes the
 ratio between Flink managed memory (e.g. for hash join) and user memory
 (for you functions and Gelly's code).

 If you run this inside an IDE, the memory is configured automatically and
 you don't have control over that at the moment. You could, however, start a
 local cluster (./bin/start-local) after you adjusted your flink-conf.yaml
 and run your programs against that configured cluster. You can do that
 either through your IDE using a RemoteEnvironment or by submitting the
 packaged JAR to the local cluster using the command-line tool (./bin/flink).

 Hope that helps.

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge shgha...@gmail.com
 wrote:

 Hello,
  I am working on a problem which implements Adamic Adar Algorithm using
 Gelly.
 I am running into this exception for all the Joins (including the one
 that are part of the reduceOnNeighbors function)

 Too few memory segments provided. Hash Join needs at least 33 memory
 segments.


 The problem persists even when I comment out some of the joins.

 Even after using edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 as suggested by @AndraLungu the problem persists.

 The code is


 DataSetTuple2Long, Long degrees = graph.getDegrees();

 //get neighbors of each vertex in the HashSet for it's value
 computedNeighbors = graph.reduceOnNeighbors(new
 GatherNeighbors(), EdgeDirection.ALL);

 //get vertices with updated values for the final Graph which
 will be used to get Adamic Edges
 Vertices = computedNeighbors.join(degrees,
 JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new
 JoinNeighborDegrees());

 GraphLong, Tuple3Double, HashSetLong, ListTuple3Long,
 Long, Double, Double updatedGraph =
 Graph.fromDataSet(Vertices, edges, env);

 //configure Vertex Centric Iteration
 VertexCentricConfiguration parameters = new
 VertexCentricConfiguration();

 parameters.setName(Find Adamic Adar Edge Weights);

 parameters.setDirection(EdgeDirection.ALL);

 //run Vertex Centric Iteration to get the Adamic Adar Edges into
 the vertex Value
 updatedGraph = updatedGraph.runVertexCentricIteration(new
 GetAdamicAdarEdgesLong(), new NeighborsMessengerLong(), 1, parameters);

 //Extract Vertices of the updated graph
 DataSetVertexLong, Tuple3Double, HashSetLong,
 ListTuple3Long, Long, Double vertices = updatedGraph.getVertices();

 //Extract the list of Edges from the vertex values
 DataSetTuple3Long, Long, Double edg = vertices.flatMap(new
 GetAdamicList());

 //Partial weights for the edges are added
 edg = edg.groupBy(0,1).reduce(new AdamGroup());

 //Graph is updated with the Adamic Adar Edges
 edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 Any idea how I could tackle this Exception?






Re: bigpetstore flink : parallelizing collections

2015-07-13 Thread Maximilian Michels
Absolutely. I see it as a synergistic process too. I just learned about
BigTop. As for the packaging, I think Flink doesn't have very different
demands compared to the other frameworks already integrated. As for the
rest, I'm not familiar enough with BigTop. Currently, Henry is the only
Flink committer looking into it but I'm sure we will find other Flink
contributors to help out as well. I can even see myself getting into it.

Thanks for your effort so far and I'm sure we'll have a good collaboration
between the projects.

Cheers,
Max

On Mon, Jul 13, 2015 at 4:56 PM, jay vyas jayunit100.apa...@gmail.com
wrote:

 ok.  now ** my thoughts ** on this are that it should be synergistic with
 flink needs, rather than an orthogonal task that you guys help us with, so
 please keep us updated what your needs are so that the work is synergistic
 https://issues.apache.org/jira/browse/BIGTOP-1927

 On Mon, Jul 13, 2015 at 9:07 AM, Maximilian Michels m...@apache.org
 wrote:

 Hi Jay,

 Great to hear there is effort to integrate Flink with BigTop. Please let
 us know if any questions come up in the course of the integration!

 Best,
 Max


 On Sun, Jul 12, 2015 at 3:57 PM, jay vyas jayunit100.apa...@gmail.com
 wrote:

 awesome thanks ! i ll  try it out.

 This is part of  a wave of jiras for bigtop flink integration.  If your
 distro/packaging folks collaborate with us - it will save you time in the
 long run, because you can piggy back the bigtop infra for rpm/deb
 packaging, smoke testing, and HDFS interop testing 

 https://issues.apache.org/jira/browse/BIGTOP-1927

 Just FYI, great to connect stephan and others, will keep you posted !

 On Sun, Jul 12, 2015 at 9:16 AM, Stephan Ewen se...@apache.org wrote:

 Hi Jay!

 You can use the fromCollection() or fromElements() method to create
 a DataSet or DataStream from a Java/Scala collection. That moves the data
 into the cluster and allows you to run parallel transformations on the
 elements.

 Make sure you set the parallelism of the operation that you want to be
 parallel.


 Here is a code sample:

 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();

 DataSetMyType data = env.fromElements(myArray);

 data.map(new TrasactionMapper()).setParallelism(80); // makes sure you
 have 80 mappers


 Stephan


 On Sun, Jul 12, 2015 at 3:04 PM, jay vyas jayunit100.apa...@gmail.com
 wrote:

 Hi flink.

 Im happy to announce that ive done a small bit of initial hacking on
 bigpetstore-flink, in order to represent what we do in spark in flink.

 TL;DR the main question is at the bottom!

 Currently, i want to generate transactions for a list of customers.
 The generation of transactions is a parallel process, and the customers 
 are
 generated beforehand.

 In hadoop , we can create an input format with custom splits if we
 want to split a data set up, otherwise, we can break it into files.

 in spark, there is a conveneint parallelize which we can run on a
 list, which we can then capture the RDD from , and run a parallelized
 transform.

 In flink, i have an array of customers and i want to parallelize our
 transaction generator for each customer.  How would i do that?

 --
 jay vyas





 --
 jay vyas





 --
 jay vyas



Re: Nested Iterations Outlook

2015-07-20 Thread Maximilian Michels
You could do that but you might run into merge conflicts. Also keep in mind
that it is work in progress :)

On Mon, Jul 20, 2015 at 4:15 PM, Maximilian Alber 
alber.maximil...@gmail.com wrote:

 Thanks!

 Ok, cool. If I would like to test it, I just need to merge those two pull
 requests into my current branch?

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 4:02 PM, Maximilian Michels m...@apache.org
 wrote:

 Now that makes more sense :) I thought by nested iterations you meant
 iterations in Flink that can be nested, i.e. starting an iteration inside
 an iteration.

 The caching/pinning of intermediate results is still a work in progress
 in Flink. It is actually in a state where it could be merged but some
 pending pull requests got delayed because priorities changed a bit.

 Essentially, we need to merge these two pull requests:

 https://github.com/apache/flink/pull/858
 This introduces a session management which allows to keep the
 ExecutionGraph for the session.

 https://github.com/apache/flink/pull/640
 Implements the actual backtracking and caching of the results.

 Once these are in, we can change the Java/Scala API to support
 backtracking. I don't exactly know how Spark's API does it but, essentially
 it should work then by just creating new operations on an existing DataSet
 and submit to the cluster again.

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 3:31 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:

 Oh sorry, my fault. When I wrote it, I had iterations in mind.

 What I actually wanted to say, how resuming from intermediate results
 will work with (non-nested) non-Flink iterations? And with iterations I
 mean something like this:

 while(...):
   - change params
   - submit to cluster

 where the executed Flink-program is more or less the same at each
 iterations. But with changing input sets, which are reused between
 different loop iterations.

 I might got something wrong, because in our group we mentioned caching a
 lá Spark for Flink and someone came up that pinning will do that. Is that
 somewhat right?

 Thanks and Cheers,
 Max

 On Mon, Jul 20, 2015 at 1:06 PM, Maximilian Michels m...@apache.org
 wrote:

  So it is up to debate how the support for resuming from intermediate
 results will look like. - What's the current state of that debate?

 Since there is no support for nested iterations that I know of, the
 debate how intermediate results are integrated has not started yet.


 Intermediate results are not produced within the iterations cycles.
 - Ok, if there are none, what does it have to do with that debate? :-)


 I was referring to the existing support for intermediate results within
 iterations. If we were to implement nested iterations, this could
 (possibly) change. This is all very theoretical because there are no plans
 to support nested iterations.

 Hope this clarifies. Otherwise, please restate your question because I
 might have misunderstood.

 Cheers,
 Max


 On Mon, Jul 20, 2015 at 12:11 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:

 Thanks for the answer! But I need some clarification:

 So it is up to debate how the support for resuming from intermediate
 results will look like. - What's the current state of that debate?
 Intermediate results are not produced within the iterations cycles.
 - Ok, if there are none, what does it have to do with that debate? :-)

 Cheers,
 Max

 On Mon, Jul 20, 2015 at 10:50 AM, Maximilian Michels m...@apache.org
 wrote:

 Hi Max,

 You are right, there is no support for nested iterations yet. As far
 as I know, there are no concrete plans to add support for it. So it is up
 to debate how the support for resuming from intermediate results will 
 look
 like. Intermediate results are not produced within the iterations cycles.
 Same would be true for nested iterations. So the behavior for resuming 
 from
 intermediate results should be alike for nested iterations.

 Cheers,
 Max

 On Fri, Jul 17, 2015 at 4:26 PM, Maximilian Alber 
 alber.maximil...@gmail.com wrote:

 Hi Flinksters,

 as far as I know, there is still no support for nested iterations
 planned. Am I right?

 So my question is how such use cases should be handled in the future.
 More specific: when pinning/caching will be available, you suggest
 to use that feature and program in Spark style? Or is there some 
 other,
 more flexible, mechanism planned for loops?

 Cheers,
 Max










Re: Flink Kafka cannot find org/I0Itec/zkclient/serialize/ZkSerializer

2015-07-21 Thread Maximilian Michels
Hi,

Are you running this locally or in a cluster environment? Did you put the
zkClient-0.5.jar in the /lib directory of every node (also task managers)?

It seems like sbt should include the zkClient dependency in the fat jar. So
there might be something wrong with your build process.

Best regards,
Max

On Tue, Jul 21, 2015 at 7:10 AM, Wendong wendong@gmail.com wrote:

 to be specific, the error occurs at:


 org.apache.flink.*streaming.connectors.kafka.api.KafkaSource.initializeConnection*(KafkaSource.java:175)



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-cannot-find-org-I0Itec-zkclient-serialize-ZkSerializer-tp2199p2200.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive
 at Nabble.com.



Re: problem with union

2015-07-15 Thread Maximilian Michels
I was able to reproduce this problem. It turns out, this has already been
fixed in the snapshot version:
https://issues.apache.org/jira/browse/FLINK-2229

The fix will be included in the upcoming 0.9.1 release. Thank you again for
reporting!

Kind regards,
Max

On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.org wrote:

 Hi Michele,

 Thanks for reporting the problem. It seems like we changed the way we
 compare generic types like your GValue type. I'm debugging that now. We can
 get a fix in for the 0.9.1 release.

 Cheers,
 Max

 On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni 
 michele1.bert...@mail.polimi.it wrote:

 Hi everybody, this discussion started in an other thread about a problem
 in union, but you said it was a different error then i am opening a new
 topic

 I am doing the union of two dataset and I am getting this error




 Exception in thread main
 org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of
 different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4:
 Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue),
 input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5:
 Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 at
 org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46)
 at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101)
 at
 it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125)
 ...




 Input1=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 input2=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6:
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)


 as you can see the two datasets have the same type
 this error only happens with a custom data type (e.g. i am using an array
 of GValue, an array of Int or Double works)

 in the last flink version it was working (milestone and snapshot) now in
 0.9.0 it is not

 what can it be?


 thanks for help

 cheers,
 Michele





Re: Reading null value from datasets

2015-10-23 Thread Maximilian Michels
Hi Guido,

This depends on your use case but you may read those values as type String
and treat them accordingly.

Cheers,
Max

On Fri, Oct 23, 2015 at 1:59 PM, Guido  wrote:

> Hello,
> I would like to ask if there were any particular ways to read or treat
> null (e.g. Name, Lastname,, Age..) value in a dataset using readCsvFile,
> without being forced to ignore them.
>
> Thanks for your time.
> Guido
>
>


Re: reading csv file from null value

2015-10-23 Thread Maximilian Michels
Hi Philip,

How about making the empty field of type String? Then you can read the CSV
into a DataSet and treat the empty string as a null value. Not very nice
but a workaround. As of now, Flink deliberately doesn't support null values.

Regards,
Max

On Thu, Oct 22, 2015 at 4:30 PM, Philip Lee  wrote:

> Hi,
>
> I am trying to load the dataset with the part of null value by using
> readCsvFile().
>
> // e.g  _date|_click|_sales|_item|_web_page|_user
>
> case class WebClick(_click_date: Long, _click_time: Long, _sales: Int, _item: 
> Int,_page: Int, _user: Int)
>
> private def getWebClickDataSet(env: ExecutionEnvironment): DataSet[WebClick] 
> = {
>
>   env.readCsvFile[WebClick](
> webClickPath,
> fieldDelimiter = "|",
> includedFields = Array(0, 1, 2, 3, 4, 5),
> // lenient = true
>   )
> }
>
>
> Well, I know there is an option to ignore malformed value, but I have to
> read the dataset even though it has null value.
>
> as it follows, dataset (third column is null) looks like
> 37794|24669||16705|23|54810
> but I have to read null value as well because I have to use filter or
> where function ( _sales == null )
>
> Is there any detail suggestion to do it?
>
> Thanks,
> Philip
>
>
>
>
>
>
>
> --
>
> ==
>
> *Hae Joon Lee*
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==
>


Re: Error running an hadoop job from web interface

2015-10-23 Thread Maximilian Michels
Hi Flavio,

Which version of Flink are you using?

Cheers,
Max

On Fri, Oct 23, 2015 at 2:45 PM, Flavio Pompermaier 
wrote:

> Hi to all,
> I'm trying to run a job from the web interface but I get this error:
>
> java.lang.RuntimeException: java.io.FileNotFoundException: JAR entry 
> core-site.xml not found in /tmp/webclient-jobs/EntitonsJsonizer.jar
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2334)
>   at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2187)
>   at 
> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2104)
>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:853)
>   at 
> org.apache.hadoop.mapred.JobConf.checkAndWarnDeprecation(JobConf.java:2088)
>   at org.apache.hadoop.mapred.JobConf.(JobConf.java:446)
>   at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:175)
>   at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:156)
>   at 
> it.okkam.flink.entitons.io.utils.ParquetThriftEntitons.readEntitons(ParquetThriftEntitons.java:42)
>   at 
> it.okkam.flink.entitons.io.utils.ParquetThriftEntitons.readEntitonsWithId(ParquetThriftEntitons.java:73)
>   at 
> org.okkam.entitons.EntitonsJsonizer.readAtomQuads(EntitonsJsonizer.java:235)
>   at org.okkam.entitons.EntitonsJsonizer.main(EntitonsJsonizer.java:119)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:220)
>   at org.apache.flink.client.CliFrontend.info(CliFrontend.java:412)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>   at 
> org.apache.flink.client.web.JobSubmissionServlet.doGet(JobSubmissionServlet.java:171)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:734)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:847)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:532)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:453)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:227)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:965)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:388)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:187)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:901)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117)
>   at 
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:113)
>   at org.eclipse.jetty.server.Server.handle(Server.java:352)
>   at 
> org.eclipse.jetty.server.HttpConnection.handleRequest(HttpConnection.java:596)
>   at 
> org.eclipse.jetty.server.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:1048)
>   at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:549)
>   at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:211)
>   at 
> org.eclipse.jetty.server.HttpConnection.handle(HttpConnection.java:425)
>   at 
> org.eclipse.jetty.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:489)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:436)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: JAR entry core-site.xml not found 
> in /tmp/webclient-jobs/EntitonsJsonizer.jar
>   at 
> sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:140)
>   at 
> sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:150)
>   at java.net.URL.openStream(URL.java:1037)
>   at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2163)
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2234)
>
>
>
> I checked the jar and it contains the core-site.xml file..Am I forced to
> configure the hadoop classpath in my Flink cluster config files?
>
> Best,
> Flavio
>


Re: Running continuously on yarn with kerberos

2015-10-23 Thread Maximilian Michels
Hi Niels,

Thank you for your question. Flink relies entirely on the Kerberos
support of Hadoop. So your question could also be rephrased to "Does
Hadoop support long-term authentication using Kerberos?". And the
answer is: Yes!

While Hadoop uses Kerberos tickets to authenticate users with services
initially, the authentication process continues differently
afterwards. Instead of saving the ticket to authenticate on a later
access, Hadoop creates its own security tockens (DelegationToken) that
it passes around. These are authenticated to Kerberos periodically. To
my knowledge, the tokens have a life span identical to the Kerberos
ticket maximum life span. So be sure to set the maximum life span very
high for long streaming jobs. The renewal time, on the other hand, is
not important because Hadoop abstracts this away using its own
security tockens.

I'm afraid there is not Kerberos how-to yet. If you are on Yarn, then
it is sufficient to authenticate the client with Kerberos. On a Flink
standalone cluster you need to ensure that, initially, all nodes are
authenticated with Kerberos using the kinit tool.

Feel free to ask if you have more questions and let us know about any
difficulties.

Best regards,
Max



On Thu, Oct 22, 2015 at 2:06 PM, Niels Basjes  wrote:
> Hi,
>
> I want to write a long running (i.e. never stop it) streaming flink
> application on a kerberos secured Hadoop/Yarn cluster. My application needs
> to do things with files on HDFS and HBase tables on that cluster so having
> the correct kerberos tickets is very important. The stream is to be ingested
> from Kafka.
>
> One of the things with Kerberos is that the tickets expire after a
> predetermined time. My knowledge about kerberos is very limited so I hope
> you guys can help me.
>
> My question is actually quite simple: Is there an howto somewhere on how to
> correctly run a long running flink application with kerberos that includes a
> solution for the kerberos ticket timeout  ?
>
> Thanks
>
> Niels Basjes


Re: reading csv file from null value

2015-10-26 Thread Maximilian Michels
As far as I know the null support was removed from the Table API because
its support was consistently supported with all operations. See
https://issues.apache.org/jira/browse/FLINK-2236

On Fri, Oct 23, 2015 at 7:18 PM, Shiti Saxena <ssaxena@gmail.com> wrote:

> For a similar problem where we wanted to preserve and track null entries,
> we load the CSV as a DataSet[Array[Object]] and then transform it into
> DataSet[Row] using a custom RowSerializer(
> https://gist.github.com/Shiti/d0572c089cc08654019c) which handles null.
>
> The Table API(which supports null) can then be used on the resulting
> DataSet[Row].
>
>
> On Fri, Oct 23, 2015 at 7:38 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Philip,
>>
>> How about making the empty field of type String? Then you can read the
>> CSV into a DataSet and treat the empty string as a null value. Not very
>> nice but a workaround. As of now, Flink deliberately doesn't support null
>> values.
>>
>> Regards,
>> Max
>>
>>
>> On Thu, Oct 22, 2015 at 4:30 PM, Philip Lee <philjj...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to load the dataset with the part of null value by using
>>> readCsvFile().
>>>
>>> // e.g  _date|_click|_sales|_item|_web_page|_user
>>>
>>> case class WebClick(_click_date: Long, _click_time: Long, _sales: Int, 
>>> _item: Int,_page: Int, _user: Int)
>>>
>>> private def getWebClickDataSet(env: ExecutionEnvironment): 
>>> DataSet[WebClick] = {
>>>
>>>   env.readCsvFile[WebClick](
>>> webClickPath,
>>> fieldDelimiter = "|",
>>> includedFields = Array(0, 1, 2, 3, 4, 5),
>>> // lenient = true
>>>   )
>>> }
>>>
>>>
>>> Well, I know there is an option to ignore malformed value, but I have to
>>> read the dataset even though it has null value.
>>>
>>> as it follows, dataset (third column is null) looks like
>>> 37794|24669||16705|23|54810
>>> but I have to read null value as well because I have to use filter or
>>> where function ( _sales == null )
>>>
>>> Is there any detail suggestion to do it?
>>>
>>> Thanks,
>>> Philip
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> ==
>>>
>>> *Hae Joon Lee*
>>>
>>>
>>> Now, in Germany,
>>>
>>> M.S. Candidate, Interested in Distributed System, Iterative Processing
>>>
>>> Dept. of Computer Science, Informatik in German, TUB
>>>
>>> Technical University of Berlin
>>>
>>>
>>> In Korea,
>>>
>>> M.S. Candidate, Computer Architecture Laboratory
>>>
>>> Dept. of Computer Science, KAIST
>>>
>>>
>>> Rm# 4414 CS Dept. KAIST
>>>
>>> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>>>
>>>
>>> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>>>
>>> ==
>>>
>>
>>
>


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-10-26 Thread Maximilian Michels
Hi Liang,

We greatly appreciate you introduced Flink to the Chinese users at CNCC! We
would love to hear how people like Flink.

Please keep us up to date and point the users to the mailing list or
Stackoverflow if they have any difficulties.

Best regards,
Max

On Sat, Oct 24, 2015 at 5:48 PM, Liang Chen  wrote:

> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n3255/IMG_20151023_104030.jpg
> >
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3255.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Reading null value from datasets

2015-10-26 Thread Maximilian Michels
As far as I know the null support was removed from the Table API because
its support was consistently supported with all operations. See
https://issues.apache.org/jira/browse/FLINK-2236


On Fri, Oct 23, 2015 at 7:20 PM, Shiti Saxena <ssaxena@gmail.com> wrote:

> For a similar problem where we wanted to preserve and track null entries,
> we load the CSV as a DataSet[Array[Object]] and then transform it into
> DataSet[Row] using a custom RowSerializer(
> https://gist.github.com/Shiti/d0572c089cc08654019c) which handles null.
>
> The Table API(which supports null) can then be used on the resulting
> DataSet[Row].
>
> On Fri, Oct 23, 2015 at 7:40 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Guido,
>>
>> This depends on your use case but you may read those values as type
>> String and treat them accordingly.
>>
>> Cheers,
>> Max
>>
>> On Fri, Oct 23, 2015 at 1:59 PM, Guido <gmazza...@gmail.com> wrote:
>>
>>> Hello,
>>> I would like to ask if there were any particular ways to read or treat
>>> null (e.g. Name, Lastname,, Age..) value in a dataset using readCsvFile,
>>> without being forced to ignore them.
>>>
>>> Thanks for your time.
>>> Guido
>>>
>>>
>>
>


Re: Running continuously on yarn with kerberos

2015-10-27 Thread Maximilian Michels
Hi Niels,

You're welcome. Some more information on how this would be configured:

In the kdc.conf, there are two variables:

max_life = 2h 0m 0s
max_renewable_life = 7d 0h 0m 0s

max_life is the maximum life of the current ticket. However, it may be
renewed up to a time span of max_renewable_life from the first ticket issue
on. This means that from the first ticket issue, new tickets may be
requested for one week. Each renewed ticket has a life time of max_life (2
hours in this case).

Please let us know about any difficulties with long-running streaming
application and Kerberos.

Best regards,
Max

On Tue, Oct 27, 2015 at 2:46 PM, Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> Thanks for your feedback.
> So I guess I'll have to talk to the security guys about having special
> kerberos ticket expiry times for these types of jobs.
>
> Niels Basjes
>
> On Fri, Oct 23, 2015 at 11:45 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Niels,
>>
>> Thank you for your question. Flink relies entirely on the Kerberos
>> support of Hadoop. So your question could also be rephrased to "Does
>> Hadoop support long-term authentication using Kerberos?". And the
>> answer is: Yes!
>>
>> While Hadoop uses Kerberos tickets to authenticate users with services
>> initially, the authentication process continues differently
>> afterwards. Instead of saving the ticket to authenticate on a later
>> access, Hadoop creates its own security tockens (DelegationToken) that
>> it passes around. These are authenticated to Kerberos periodically. To
>> my knowledge, the tokens have a life span identical to the Kerberos
>> ticket maximum life span. So be sure to set the maximum life span very
>> high for long streaming jobs. The renewal time, on the other hand, is
>> not important because Hadoop abstracts this away using its own
>> security tockens.
>>
>> I'm afraid there is not Kerberos how-to yet. If you are on Yarn, then
>> it is sufficient to authenticate the client with Kerberos. On a Flink
>> standalone cluster you need to ensure that, initially, all nodes are
>> authenticated with Kerberos using the kinit tool.
>>
>> Feel free to ask if you have more questions and let us know about any
>> difficulties.
>>
>> Best regards,
>> Max
>>
>>
>>
>> On Thu, Oct 22, 2015 at 2:06 PM, Niels Basjes <ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> > I want to write a long running (i.e. never stop it) streaming flink
>> > application on a kerberos secured Hadoop/Yarn cluster. My application
>> needs
>> > to do things with files on HDFS and HBase tables on that cluster so
>> having
>> > the correct kerberos tickets is very important. The stream is to be
>> ingested
>> > from Kafka.
>> >
>> > One of the things with Kerberos is that the tickets expire after a
>> > predetermined time. My knowledge about kerberos is very limited so I
>> hope
>> > you guys can help me.
>> >
>> > My question is actually quite simple: Is there an howto somewhere on
>> how to
>> > correctly run a long running flink application with kerberos that
>> includes a
>> > solution for the kerberos ticket timeout  ?
>> >
>> > Thanks
>> >
>> > Niels Basjes
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Wrong owner of HDFS output folder

2015-10-26 Thread Maximilian Michels
The problem is that non-root processes may not be able to read root-owned
files/folders. Therefore, we cannot really check as a non-root users
whether root-owned clusters have been started. It's better not to run Flink
with root permissions.

You're welcome.

Cheers,
Max

On Mon, Oct 26, 2015 at 3:23 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I just stopped the cluster with stop-cluster.sh but I had to manually kill
> the root process because it was not able to terminate it using the
> aforementioned script.
> Then I restarted the cluster via start-cluster.sh and now all processes
> run with the user it was supposed to. Probably once in the past I started
> the services with sudo and then I was convinced to restart the cluster
> using the start/stop scripts but the job manager was never restarted
> actually..
> However I didn't get any error about that, I was just reading
>
> "No jobmanager daemon (pid: ) is running anymore on myhost.test.it"
>
> Maybe the scripts could be improved to check such a situation?
>
> Thanks for the support,
> Flavio
>
> On Mon, Oct 26, 2015 at 3:14 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Yes, the job manager starts as a root process, while taskmanagers with my
>> user..is that normal?
>> I was convinced that start-cluster.sh was starting all processes with the
>> same user :O
>>
>> On Mon, Oct 26, 2015 at 3:09 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> Are you runing your Flink cluster with root permissions? The directory
>>> to hold the output splits are created by the JobManager. So if you run then
>>> JobManager with root permissions, it will create a folder owned by root. If
>>> the task managers are not run with root permissions, this could be a
>>> problem.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Mon, Oct 26, 2015 at 2:40 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>> when I run my job within my hadoop cluster (both from command line and
>>>> from webapp) the output of my job (HDFS) works fine until I set the write
>>>> parallelism to 1 (the output file is created with the user running the
>>>> job). If I leave the default parallelism (>1) the job fails because it
>>>> creates a folder where the owner of the output folder is the root user and
>>>> the job cannot write the files of my user in that folder anymore. Am I
>>>> doing something wrong?
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>
>>
>


Re: Wrong owner of HDFS output folder

2015-10-26 Thread Maximilian Michels
Hi Flavio,

Are you runing your Flink cluster with root permissions? The directory to
hold the output splits are created by the JobManager. So if you run then
JobManager with root permissions, it will create a folder owned by root. If
the task managers are not run with root permissions, this could be a
problem.

Cheers,
Max

On Mon, Oct 26, 2015 at 2:40 PM, Flavio Pompermaier 
wrote:

> Hi to all,
> when I run my job within my hadoop cluster (both from command line and
> from webapp) the output of my job (HDFS) works fine until I set the write
> parallelism to 1 (the output file is created with the user running the
> job). If I leave the default parallelism (>1) the job fails because it
> creates a folder where the owner of the output folder is the root user and
> the job cannot write the files of my user in that folder anymore. Am I
> doing something wrong?
>
>
> Best,
> Flavio
>
>


Re: Error running an hadoop job from web interface

2015-10-26 Thread Maximilian Michels
That's odd. Does it also execute with parallelism 36 then?

On Mon, Oct 26, 2015 at 3:06 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> No, I just use the default parallelism
>
> On Mon, Oct 26, 2015 at 3:05 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Did you set the default parallelism of the cluster to 36? This is because
>> the plan gets optimized against the cluster configuration when you try to
>> run the uploaded program. Before, it doesn't do any optimization. This
>> might not be very intuitive. We should probably change that.
>>
>> On Mon, Oct 26, 2015 at 2:03 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Now that I've recompiled flink and restarted the web-client everything
>>> works fine.
>>>
>>> However, when I flag the job I want to run I see parallelism 1 in the
>>> right panel, but when I click on "Run Job" button + show optimizer plan
>>> flagged I see parallelism 36. Is that a bug of the first preview?
>>>
>>>
>>> On Mon, Oct 26, 2015 at 10:01 AM, Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Correct. I'll fix it today.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Mon, Oct 26, 2015 at 9:08 AM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Running from the shell everything works..is it a problem of
>>>>> classloaders hierarchy in the webapp?
>>>>>
>>>>> On Fri, Oct 23, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> ./bin/flink run /path/to/jar arguments
>>>>>>
>>>>>> or
>>>>>>
>>>>>> ./bin/flink run -c MainClass /path/to/jar arguments
>>>>>>
>>>>>> On Fri, Oct 23, 2015 at 5:50 PM, Stefano Bortoli <s.bort...@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> What I normally do is to
>>>>>>>
>>>>>>> java -cp MYUBERJAR.jar my.package.mainclass
>>>>>>>
>>>>>>> does it make sense?
>>>>>>>
>>>>>>> 2015-10-23 17:22 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> could you write ne the command please?I'm not in the office right
>>>>>>>> now..
>>>>>>>> On 23 Oct 2015 17:10, "Maximilian Michels" <m...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Could you try submitting the job from the command-line and see if
>>>>>>>>> it works?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On Fri, Oct 23, 2015 at 4:42 PM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> 0.10-snapshot
>>>>>>>>>> On 23 Oct 2015 16:09, "Maximilian Michels" <m...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>
>>>>>>>>>>> Which version of Flink are you using?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 23, 2015 at 2:45 PM, Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>> I'm trying to run a job from the web interface but I get this
>>>>>>>>>>>> error:
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.RuntimeException: java.io.FileNotFoundException: JAR 
>>>>>>>>>>>> entry core-site.xml not found in 
>>>>>>>>>>>> /tmp/webclient-jobs/EntitonsJsonizer.jar
>>>>>>>&g

Re: Cluster installation gives java.lang.NoClassDefFoundError for everything

2015-11-11 Thread Maximilian Michels
Hi Camelia,

Flink 0.9.X supports Java 6. So this can't be the issue.

Out of curiosity, I gave it a spin on a Linux machine with OpenJDK 6. I was
able to start the command-line interface, job manager and task managers.

java version "1.6.0_36"
OpenJDK Runtime Environment (IcedTea6 1.13.8) (6b36-1.13.8-0ubuntu1~14.04)
OpenJDK 64-Bit Server VM (build 23.25-b01, mixed mode)


I think the error must be caused by your NFS setup. Let's start with
./bin/flink. For instance, can you access
"/users/camelia/thecluster/flink-0.9.1/lib/flink-dist-0.9.1.jar" from the
machine where you run ./bin/flink?

Best,
Max


On Wed, Nov 11, 2015 at 10:41 AM, Camelia Elena Ciolac 
wrote:

>  Hello,
>
> As promised, I come back with debugging details.
> So:
>
> *** In start-cluster.sh , the following echo's
>
> echo "start-cluster ~"
> echo $HOSTLIST
> echo "start-cluster ~"
> echo $FLINK_BIN_DIR
> echo "start-cluster ~"
>
> # cluster mode, bring up job manager locally and a task manager on every
> slave host
> "$FLINK_BIN_DIR"/jobmanager.sh start cluster batch
>
> ==>  gave:
>
> start-cluster ~
> /users/camelia/thecluster/flink-0.9.1/conf/slaves
> start-cluster ~
> /users/camelia/thecluster/flink-0.9.1/bin
> start-cluster ~
>
>
> *** In jobmanager.sh , the following echo's
>
> echo "Starting Job Manager"
> echo "jobmanager ~"
> pwd
> echo "jobmanager ~"
> echo ${FLINK_ENV_JAVA_OPTS}
> echo "jobmanager ~"
> echo $FLINK_JM_CLASSPATH
> echo "jobmanager ~"
> echo $INTERNAL_HADOOP_CLASSPATHS
> echo "jobmanager "
> echo $FLINK_CONF_DIR
> echo ""
> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
> -classpath "`manglePathList
> "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
> org.apache.flink.runtime.jobmanager.JobManager --configDir
> "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode
> "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
>
>
>
> ==> gave:
>
> jobmanager ~
> /users/camelia/thecluster
> jobmanager ~
>
> jobmanager ~
>
> /users/camelia/thecluster/flink-0.9.1/lib/flink-dist-0.9.1.jar:/users/camelia/thecluster/flink-0.9.1/lib/flink-python-0.9.1.jar
> jobmanager ~
> ::
> jobmanager 
> /users/camelia/thecluster/flink-0.9.1/conf
> 
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/runtime/jobmanager/JobManager
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.runtime.jobmanager.JobManager
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> Could not find the main class:
> org.apache.flink.runtime.jobmanager.JobManager. Program will exit.
>
>
>
> *** In taskmanager.sh, the following echo's
>
> echo Starting task manager on host $HOSTNAME
> echo "taskmanager ~"
> pwd
> echo "taskmanager ~"
> echo ${FLINK_ENV_JAVA_OPTS}
> echo "taskmanager ~"
> echo $FLINK_JM_CLASSPATH
> echo "taskmanager ~"
> echo $INTERNAL_HADOOP_CLASSPATHS
> echo "taskmanager "
> echo $FLINK_CONF_DIR
> echo ""
> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
> -classpath "`manglePathList
> "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 <
> /dev/null &
>
> ==> gave:
>
> taskmanager ~
> /users/camelia/thecluster
> taskmanager ~
>
> taskmanager ~
>
> taskmanager ~
> ::
> taskmanager 
> /users/camelia/thecluster/flink-0.9.1/conf
> 
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/runtime/taskmanager/TaskManager
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.runtime.taskmanager.TaskManager
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at 

Re: Missing 0.10 SNAPSHOT Download

2015-11-09 Thread Maximilian Michels
Hi Brian,

We are currently in the process of releasing 0.10.0. Thus, the master
version has already been updated to 1.0 which is the next scheduled
release.

If you want to use the latest SNAPSHOT version, you may build it from
source or use the SNAPSHOT Maven artifacts. For more information,
please see http://flink.apache.org/contribute-code.html#snapshots-nightly-builds

Please bear in mind that SNAPSHOT versions are not officially
released. Thus, may contain bugs or less documented features.

Cheers,
Max

On Mon, Nov 9, 2015 at 5:37 PM, Brian Chhun
 wrote:
>
>
> On Mon, Nov 9, 2015 at 9:59 AM, Brian Chhun 
> wrote:
>>
>> Is there a way to download 0.10 SNAPSHOT package like what' s available
>> for 0.9.1? The downloads page on http://flink.apache.org/ seems to only have
>> up to 0.9.1, despite having documentation for "0.10 SNAPSHOT" (the
>> documentation link also appears to actually link to the 1.0 SNAPSHOT
>> release).
>>
>> Thanks,
>> Brian
>
>


Re: Multilang Support on Flink

2015-11-13 Thread Maximilian Michels
Hi Welly,

There is a protocol for communicating with other processes. This is
reflected in flink-language-binding-generic module. I'm not aware how
Spark or Storm communication protocols work but this protocol is
rather low level.

Cheers,
Max

On Fri, Nov 13, 2015 at 9:49 AM, Welly Tambunan  wrote:
> Hi All,
>
> I want to ask if there's multilang support ( like in Storm and pipeTo in
> Spark ) in flink ?
>
> I try to find it in the docs but can't find it.
>
> Any link or direction would be really appreciated.
>
>
> Cheers
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Multilang Support on Flink

2015-11-16 Thread Maximilian Michels
Hi Welly,

It's in the main Flink repository. Actually, this has just been
integrated with the Python API, see
https://github.com/apache/flink/blob/master/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java

Before it was independent
https://github.com/apache/flink/blob/release-0.10/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java

I think the creators decided it was too specific to the Python API to
actually serve as a generic interface. You might want to ask Chesnay
(in CC) for the details.

Cheers,
Max

On Sat, Nov 14, 2015 at 12:52 AM, Welly Tambunan <if05...@gmail.com> wrote:
> Hi Max,
>
> Do you know where the repo is ?
>
> I try to search on the flink staging but seems it's not there anymore ( via
> google)
>
> Cheers
>
> On Fri, Nov 13, 2015 at 5:07 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Welly,
>>
>> There is a protocol for communicating with other processes. This is
>> reflected in flink-language-binding-generic module. I'm not aware how
>> Spark or Storm communication protocols work but this protocol is
>> rather low level.
>>
>> Cheers,
>> Max
>>
>> On Fri, Nov 13, 2015 at 9:49 AM, Welly Tambunan <if05...@gmail.com> wrote:
>> > Hi All,
>> >
>> > I want to ask if there's multilang support ( like in Storm and pipeTo in
>> > Spark ) in flink ?
>> >
>> > I try to find it in the docs but can't find it.
>> >
>> > Any link or direction would be really appreciated.
>> >
>> >
>> > Cheers
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Flink on EC"

2015-11-09 Thread Maximilian Michels
Hi Thomas,

It appears Flink couldn't pick up the Hadoop configuration. Did you
set the environment variables HADOOP_CONF_DIR or HADOOP_HOME?

Best,
Max

On Sun, Nov 8, 2015 at 7:52 PM, Thomas Götzinger  wrote:
> Sorry for Confusing,
>
> the flink cluster throws following stack trace..
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job 29a2a25d49aa0706588ccdb8b7e81c6c
> (Flink Java Job at Sun Nov 08 18:50:52 UTC 2015)
> at org.apache.flink.client.program.Client.run(Client.java:413)
> at org.apache.flink.client.program.Client.run(Client.java:356)
> at org.apache.flink.client.program.Client.run(Client.java:349)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> at de.fraunhofer.iese.proopt.Template.run(Template.java:112)
> at de.fraunhofer.iese.proopt.Main.main(Main.java:8)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> at org.apache.flink.client.program.Client.run(Client.java:315)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
> submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink Java Job at Sun Nov 08
> 18:50:52 UTC 2015)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits
> caused an error: No file system found with scheme s3n, referenced in file
> URI 's3n://big-data-benchmark/pavlo/text/tiny/rankings'.
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:162)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
> ... 19 more
> Caused by: java.io.IOException: No file system found with scheme s3n,
> referenced in file URI 's3n://big-data-benchmark/pavlo/text/tiny/rankings'.
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:247)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:447)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:146)
> ... 21 more
>
> --
>
> Viele Grüße
>
>
>
> Thomas Götzinger
>
> Freiberuflicher Informatiker
>
>
>
> Glockenstraße 2a
>
> D-66882 Hütschenhausen OT Spesbach
>
> Mobil: 

Re: Running continuously on yarn with kerberos

2015-11-05 Thread Maximilian Michels
Thank you for looking into the problem, Niels. Let us know if you need
anything. We would be happy to merge a pull request once you have verified
the fix.

On Thu, Nov 5, 2015 at 1:38 PM, Niels Basjes <ni...@basjes.nl> wrote:

> I created https://issues.apache.org/jira/browse/FLINK-2977
>
> On Thu, Nov 5, 2015 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Niels,
>> thank you for analyzing the issue so properly. I agree with you. It seems
>> that HDFS and HBase are using their own tokes which we need to transfer
>> from the client to the YARN containers. We should be able to port the fix
>> from Spark (which they got from Storm) into our YARN client.
>> I think we would add this in org.apache.flink.yarn.Utils#setTokensFor().
>>
>> Do you want to implement and verify the fix yourself? If you are to busy
>> at the moment, we can also discuss how we share the work (I'm implementing
>> it, you test the fix)
>>
>>
>> Robert
>>
>> On Tue, Nov 3, 2015 at 5:26 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>
>>> Update on the status so far I suspect I found a problem in a secure
>>> setup.
>>>
>>> I have created a very simple Flink topology consisting of a streaming
>>> Source (the outputs the timestamp a few times per second) and a Sink (that
>>> puts that timestamp into a single record in HBase).
>>> Running this on a non-secure Yarn cluster works fine.
>>>
>>> To run it on a secured Yarn cluster my main routine now looks like this:
>>>
>>> public static void main(String[] args) throws Exception {
>>> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
>>> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
>>> "/home/nbasjes/.krb/nbasjes.keytab");
>>>
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setParallelism(1);
>>>
>>> DataStream stream = env.addSource(new TimerTicksSource());
>>> stream.addSink(new SetHBaseRowSink());
>>> env.execute("Long running Flink application");
>>> }
>>>
>>> When I run this
>>>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096
>>> ./kerberos-1.0-SNAPSHOT.jar
>>>
>>> I see after the startup messages:
>>>
>>> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>>> - Login successful for user nbas...@xx.net using keytab
>>> file /home/nbasjes/.krb/nbasjes.keytab
>>> 11/03/2015 17:13:25 Job execution switched to status RUNNING.
>>> 11/03/2015 17:13:25 Custom Source -> Stream Sink(1/1) switched to
>>> SCHEDULED
>>> 11/03/2015 17:13:25 Custom Source -> Stream Sink(1/1) switched to
>>> DEPLOYING
>>> 11/03/2015 17:13:25 Custom Source -> Stream Sink(1/1) switched to
>>> RUNNING
>>>
>>> Which looks good.
>>>
>>> However ... no data goes into HBase.
>>> After some digging I found this error in the task managers log:
>>>
>>> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient
>>>  - Exception encountered while connecting to the server : 
>>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>>> GSSException: No valid credentials provided (Mechanism level: Failed to 
>>> find any Kerberos tgt)]
>>> 17:13:42,677 FATAL org.apache.hadoop.hbase.ipc.RpcClient
>>>  - SASL authentication failed. The most likely cause is missing or 
>>> invalid credentials. Consider 'kinit'.
>>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>>> GSSException: No valid credentials provided (Mechanism level: Failed to 
>>> find any Kerberos tgt)]
>>> at 
>>> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>>> at 
>>> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:177)
>>> at 
>>> org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:815)
>>> at 
>>> org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$800(RpcClient.java:349)
>>>
>>>
>>> First starting a yarn-session and then loading my job gives the same
>>> error.
>>>
>>> My best guess at this point is that Flink needs the same fix as
>>> described her

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

2015-10-19 Thread Maximilian Michels
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee  wrote:
> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==


Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

2015-10-19 Thread Maximilian Michels
Hi Philip,

You're welcome. Just a small correction: Hive's SORT BY should be
DataSet.groupBy(key).sortGroup(key) in Flink. This ensures sorted
grouped records within the reducer that follows. No need to set the
parallelism to 1.

Best,
Max

On Mon, Oct 19, 2015 at 1:28 PM, Philip Lee <philjj...@gmail.com> wrote:
> Thanks, Max!
>
> I really appreciate about the way you answered.
> As you remarked on ORDER BY, in order to user this function on flink
> I have to set parallelism to 1 in SortPartiton for a total ordering in on
> place?
>
> I just want to make sure about SORT BY as well.
> As a reminder, this function on Hive is just sorting ordering within a
> reducer.
> If I apply [groupBy] function before [sortPartiton],
> it will be same way of SORT BY? It does not need to set paralleism to 1,
> right?
>
> Best Regards,
> Philip
>
>
> On Mon, Oct 19, 2015 at 1:17 PM, Philip Lee <philjj...@gmail.com> wrote:
>>
>> Dear all,
>>
>> Actually, last night I sent the email to flink committer about proper
>> translation from Hive to Flink.
>> I got the answer from flink mailing-list people about it.
>> I am pretty sure these two mails will really help you.
>>
>> I will take a note by following this contents on our google docs.
>> This note will also help big-benchmark people later.
>>
>> Regards,
>> Philip
>>
>>
>>
>> -- Forwarded message --
>> From: Maximilian Michels <m...@apache.org>
>> Date: Mon, Oct 19, 2015 at 1:01 PM
>> Subject: Re: Hi, Flink people, a question about translation from HIVE
>> Query to Flink fucntioin by using Table API
>> To: "user@flink.apache.org" <user@flink.apache.org>
>>
>>
>> Hi Philip,
>>
>> Thank you for your questions. I think you have mapped the HIVE
>> functions to the Flink ones correctly. Just a remark on the ORDER BY.
>> You wrote that it produces a total order of all the records. In this
>> case, you'd have do a SortPartition operation with parallelism set to
>> 1. This is necessary because we need to have all records in one place
>> to perform a sort on them.
>>
>> Considering your reduce question: There is no fundamental
>> advantage/disadvantage of using GroupReduce over Reduce. It depends on
>> your use case which one is more convenient or efficient. For the
>> regular reduce, you just get two elements and produce one. You can't
>> easily keep state between the reduces other than in the value itself.
>> The GroupReduce, on the other hand, may produce none, one, or multiple
>> elements per grouping and keep state in between emitting values. Thus,
>> GroupReduce is a more powerful operator and can be seen as a superset
>> of the Reduce operator. I would advise you to use the one you find
>> easiest to use.
>>
>> Best regards,
>> Max
>>
>> On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <philjj...@gmail.com> wrote:
>> > Hi, Flink people, a question about translation from HIVE Query to Flink
>> > fucntioin by using Table API. In sum up, I am working on some benchmark
>> > for
>> > flink
>> >
>> > I am Philip Lee majoring in Computer Science in Master Degree of TUB. ,
>> > I
>> > work on translation from Hive Query of Benchmark to Flink codes.
>> >
>> > As I stuided it, I have a few of questions.
>> >
>> > First of all, if there are people who do no know Hive functions, let me
>> > briefly explan.
>> >
>> > ORDER BY: it just guarntees total order in the output.
>> > SORT BY: it only guarntess ordering of the rows within a reducer.
>> > GROUP BY: this is just groupBy function in SQL.
>> > DISTRIBUTE BY: all rows with the same distributed by columns will go to
>> > the
>> > same reducer.
>> > CLUSTER BY: this is just consisted of Distribute By the same column +
>> > Sort
>> > By the same column.
>> >
>> > I just want to check that the flink functions I use are equal to Hive
>> > one.
>> > < Hive SQL Query = Flink functions >
>> >
>> > ORDER BY = sortPartition(,)
>> > SORT BY= groupBy(`col).sortPartition(,)
>> > GROUP BY: this is just groupBy function.
>> > DISTRIBUTE BY = groupBy(`col)
>> > CLUSTER BY = groupBy(`col).sortPartition(,)
>> >
>> > I do not see much difference between groupBy and distributed by if I
>> > apply
>> > it to flink function.
>> > If this is hadoop version, we could say mapper is distribute by on
>> > hadoo

Re:

2015-10-19 Thread Maximilian Michels
Hi Jakob,

Thank you for reporting the bug. Could you please post your
configuration here? In particular, could you please tell us the value
of the following configuration variables:

taskmanager.heap.mb
taskmanager.network.numberOfBuffers
taskmanager.memory.off-heap

Are you running the Flink cluster in batch or streaming mode?

Direct memory is used by Flink's network layer. My guess is that you
have set taskmanager.heap.mb too low (it constraints the number of
direct memory at the moment).

Thank you,
Max


On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
 wrote:
> Hello,
>
> We are running into a strange problem with Direct Memory buffers. From what
> I know, we are not using any direct memory buffers inside our code.
> This is pretty trivial streaming application just doing some dedupliction
> and union some kafka streams.
>
> /Jakob
>
>
>
> 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
> - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
> exception.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at
> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
> at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
> at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
> at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
> at
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
> ... 10 more
>


Re: Powered by Flink

2015-10-19 Thread Maximilian Michels
+1 Let's collect in the Wiki for now. At some point in time, we might
want to have a dedicated page on the Flink homepage.

On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther  wrote:
> Ah ok, sorry. I think linking to the wiki is also ok.
>
>
> On 19.10.2015 15:18, Fabian Hueske wrote:
>>
>> @Timo: The proposal was to keep the list in the wiki (can be easily
>> extended) but link from the main website to the wiki page.
>>
>> 2015-10-19 15:16 GMT+02:00 Timo Walther :
>>
>>> +1 for adding it to the website instead of wiki.
>>> "Who is using Flink?" is always a question difficult to answer to
>>> interested users.
>>>
>>>
>>> On 19.10.2015 15:08, Suneel Marthi wrote:
>>>
>>> +1 to this.
>>>
>>> On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske  wrote:
>>>
 Sounds good +1

 2015-10-19 14:57 GMT+02:00 Márton Balassi < 
 balassi.mar...@gmail.com>:

> Thanks for starting and big +1 for making it more prominent.
>
> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske < 

 fhue...@gmail.com> wrote:
>>
>> Thanks for starting this Kostas.
>>
>> I think the list is quite hidden in the wiki. Should we link from
>> flink.apache.org to that page?
>>
>> Cheers, Fabian
>>
>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas < 

 ktzou...@apache.org>:
>>>
>>> Hi everyone,
>>>
>>> I started a "Powered by Flink" wiki page, listing some of the
>>> organizations that are using Flink:
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>>>
>>> If you would like to be added to the list, just send me a short email
>>> with your organization's name and a description and I will add you to

 the
>>>
>>> wiki page.
>>>
>>> Best,
>>> Kostas
>>>
>>
>>>
>>>
>


Re:

2015-10-19 Thread Maximilian Michels
I forgot to ask you: Which version of Flink are you using? 0.9.1 or
0.10-SNAPSHOT?

On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <m...@apache.org> wrote:
> Hi Jakob,
>
> Thanks. Flink allocates its network memory as direct memory outside
> the normal Java heap. By default, that is 64MB but can grow up to
> 128MB on heavy network transfer. How much memory does your machine
> have? Could it be that your upper memory bound is lower than 2048 +
> 128 MB?
>
> Best,
> Max
>
> On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
> <jakob.erics...@gmail.com> wrote:
>> Hi,
>>
>> See answers below.
>>
>> /Jakob
>>
>> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote:
>>>
>>> Hi Jakob,
>>>
>>> Thank you for reporting the bug. Could you please post your
>>> configuration here? In particular, could you please tell us the value
>>> of the following configuration variables:
>>>
>>> taskmanager.heap.mb
>>
>> taskmanager.heap.mb: 2048
>>>
>>> taskmanager.network.numberOfBuffers
>>
>>
>> Default value. Not changed.
>>
>>>
>>> taskmanager.memory.off-heap
>>>
>> Default value Not changed.
>>
>>>
>>> Are you running the Flink cluster in batch or streaming mode?
>>>
>> Started in streaming mode. Running with two nodes. In the cluster.
>> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some
>> strange java core dumps in the G1 GC.
>>
>>>
>>> Direct memory is used by Flink's network layer. My guess is that you
>>> have set taskmanager.heap.mb too low (it constraints the number of
>>> direct memory at the moment).
>>>
>>> Thank you,
>>> Max
>>>
>>>
>>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> <jakob.erics...@gmail.com> wrote:
>>> > Hello,
>>> >
>>> > We are running into a strange problem with Direct Memory buffers. From
>>> > what
>>> > I know, we are not using any direct memory buffers inside our code.
>>> > This is pretty trivial streaming application just doing some
>>> > dedupliction
>>> > and union some kafka streams.
>>> >
>>> > /Jakob
>>> >
>>> >
>>> >
>>> > 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
>>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
>>> > exception.
>>> >
>>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> > at
>>> >
>>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> > at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> > at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>>> > at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java

Re:

2015-10-19 Thread Maximilian Michels
Hi Jakob,

Thanks. Flink allocates its network memory as direct memory outside
the normal Java heap. By default, that is 64MB but can grow up to
128MB on heavy network transfer. How much memory does your machine
have? Could it be that your upper memory bound is lower than 2048 +
128 MB?

Best,
Max

On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
<jakob.erics...@gmail.com> wrote:
> Hi,
>
> See answers below.
>
> /Jakob
>
> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Jakob,
>>
>> Thank you for reporting the bug. Could you please post your
>> configuration here? In particular, could you please tell us the value
>> of the following configuration variables:
>>
>> taskmanager.heap.mb
>
> taskmanager.heap.mb: 2048
>>
>> taskmanager.network.numberOfBuffers
>
>
> Default value. Not changed.
>
>>
>> taskmanager.memory.off-heap
>>
> Default value Not changed.
>
>>
>> Are you running the Flink cluster in batch or streaming mode?
>>
> Started in streaming mode. Running with two nodes. In the cluster.
> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some
> strange java core dumps in the G1 GC.
>
>>
>> Direct memory is used by Flink's network layer. My guess is that you
>> have set taskmanager.heap.mb too low (it constraints the number of
>> direct memory at the moment).
>>
>> Thank you,
>> Max
>>
>>
>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> <jakob.erics...@gmail.com> wrote:
>> > Hello,
>> >
>> > We are running into a strange problem with Direct Memory buffers. From
>> > what
>> > I know, we are not using any direct memory buffers inside our code.
>> > This is pretty trivial streaming application just doing some
>> > dedupliction
>> > and union some kafka streams.
>> >
>> > /Jakob
>> >
>> >
>> >
>> > 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
>> > exception.
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> > at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> > at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >   

Re:

2015-10-19 Thread Maximilian Michels
When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
If it has been more than a couple of weeks, then I'd advise you to
update to the latest snapshot version. There has been an issue with
the calculation of the off-heap memory limit in the past.

Thanks,
Max

On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
> It's 0.10-SNAPSHOT
>
> Gyula
>
> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt. 19., H,
> 17:13):
>>
>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> 0.10-SNAPSHOT?
>>
>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> > Hi Jakob,
>> >
>> > Thanks. Flink allocates its network memory as direct memory outside
>> > the normal Java heap. By default, that is 64MB but can grow up to
>> > 128MB on heavy network transfer. How much memory does your machine
>> > have? Could it be that your upper memory bound is lower than 2048 +
>> > 128 MB?
>> >
>> > Best,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> > <jakob.erics...@gmail.com> wrote:
>> >> Hi,
>> >>
>> >> See answers below.
>> >>
>> >> /Jakob
>> >>
>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <m...@apache.org>
>> >> wrote:
>> >>>
>> >>> Hi Jakob,
>> >>>
>> >>> Thank you for reporting the bug. Could you please post your
>> >>> configuration here? In particular, could you please tell us the value
>> >>> of the following configuration variables:
>> >>>
>> >>> taskmanager.heap.mb
>> >>
>> >> taskmanager.heap.mb: 2048
>> >>>
>> >>> taskmanager.network.numberOfBuffers
>> >>
>> >>
>> >> Default value. Not changed.
>> >>
>> >>>
>> >>> taskmanager.memory.off-heap
>> >>>
>> >> Default value Not changed.
>> >>
>> >>>
>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>>
>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to
>> >> some
>> >> strange java core dumps in the G1 GC.
>> >>
>> >>>
>> >>> Direct memory is used by Flink's network layer. My guess is that you
>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> direct memory at the moment).
>> >>>
>> >>> Thank you,
>> >>> Max
>> >>>
>> >>>
>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> <jakob.erics...@gmail.com> wrote:
>> >>> > Hello,
>> >>> >
>> >>> > We are running into a strange problem with Direct Memory buffers.
>> >>> > From
>> >>> > what
>> >>> > I know, we are not using any direct memory buffers inside our code.
>> >>> > This is pretty trivial streaming application just doing some
>> >>> > dedupliction
>> >>> > and union some kafka streams.
>> >>> >
>> >>> > /Jakob
>> >>> >
>> >>> >
>> >>> >
>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED
>> >>> > with
>> >>> > exception.
>> >>> >
>> >>> >
>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> > at
>> >>> >
>> >>> >
>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> > at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> > at
>> >>> >
>> 

Re:

2015-10-20 Thread Maximilian Michels
Hi Jakob,

Your revision number is fairly new and your direct memory
configuration seems to be correct for your setup. If you have the
time, you could verify that the memory flags for the JVM are set
correctly by the startup script. You can see that in the first lines
of the task manager log. If the direct memory was set to 2GB with the
default number of network buffers, the JVM should have had enough
direct memory. Still, we'd like to find out what caused your problem.

Are you running on YARN or standalone?

Yes, the usual setup is one task manager per host/VM. The task manager
will allocate all memory upfront. However, a large part of this memory
will be self-managed by Flink and not touched much by the GC. By
default, this is 0.7 of the configured heap memory. You can control
this ratio with the taskmanager.memory.fraction variable. You can also
set a fixed managed memory size using taskmanager.memory.size (MB). In
large memory setups, we have seen a slightly better performance using
off-heap memory allocation. This can be configured using
taskmanager.memory.off-heap: true.

Please let us know if you experience any further issues.

Best,
Max

On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
<jakob.erics...@gmail.com> wrote:
> The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf,
> Date:15.10.2015 @ 11:34:01 CEST)"
>
> We have a lot of memory left on the machine. I have increased it quite a
> lot.
>
> What is your thought on memory configuration?
> If I understand Flink correctly, you should only have one taskmanager
> running each host?
>
> For a pretty standard machine with 16 cores and 32-64 GB memory. This means
> that you will have one java process running with a Xmx30G or even higher for
> exhausting all memory of the machine. This is, at least for the CMS GC, not
> the most optimal configuration.
> It might be viable for G1 but we got some really serious java core dumps
> when running G1.
>
> I looked a bit on the flags that was set on the process and it seems that
> Xmx and MaxDirectMemorySize are set to the same value by the shell script.
> When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> running with a taskmanager.heap.mb:2048. So the direct memory buffer was set
> to 2GB.
>
> I have restarted the process with G1 again and 20GB as taskmanager.heap.mb.
> Lets see if it will be stable during the night.
>
>
> On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> You can see the revision number and the build date in the JobManager
>> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>>
>> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> > If it has been more than a couple of weeks, then I'd advise you to
>> > update to the latest snapshot version. There has been an issue with
>> > the calculation of the off-heap memory limit in the past.
>> >
>> > Thanks,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com>
>> > wrote:
>> >> It's 0.10-SNAPSHOT
>> >>
>> >> Gyula
>> >>
>> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt. 19.,
>> >> H,
>> >> 17:13):
>> >>>
>> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >>> 0.10-SNAPSHOT?
>> >>>
>> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <m...@apache.org>
>> >>> wrote:
>> >>> > Hi Jakob,
>> >>> >
>> >>> > Thanks. Flink allocates its network memory as direct memory outside
>> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >>> > 128MB on heavy network transfer. How much memory does your machine
>> >>> > have? Could it be that your upper memory bound is lower than 2048 +
>> >>> > 128 MB?
>> >>> >
>> >>> > Best,
>> >>> > Max
>> >>> >
>> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >>> > <jakob.erics...@gmail.com> wrote:
>> >>> >> Hi,
>> >>> >>
>> >>> >> See answers below.
>> >>> >>
>> >>> >> /Jakob
>> >>> >>
>> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, 

Re: Flink batch runs OK but Yarn container fails in batch mode with -m yarn-cluster

2015-10-20 Thread Maximilian Michels
Hi Arnaud,

No problem. Good to hear it is resolved :)

Best,
Max


On Tue, Oct 20, 2015 at 4:37 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote:
> Hi,
> Sorry for the long delay, I've missed this mail.
> I was using the 0.10 snapshot. I've upgraded it today and it seems to work 
> now, I do have a SUCCEEDED too.
>
> Best regards,
> Arnaud
>
> -----Message d'origine-
> De : Maximilian Michels [mailto:m...@apache.org]
> Envoyé : jeudi 8 octobre 2015 14:34
> À : user@flink.apache.org; LINZ, Arnaud <al...@bouyguestelecom.fr>
> Objet : Re: Flink batch runs OK but Yarn container fails in batch mode with 
> -m yarn-cluster
>
> Hi Arnaud,
>
> I've looked into the problem but I couldn't reproduce it using Flink 0.9.0, 
> Flink 0.9.1 and the current master snapshot (f332fa5). I always ended up with 
> the final state SUCCEEDED.
>
> Which version of Flink were you using?
>
> Best regards,
> Max
>
> On Thu, Sep 3, 2015 at 10:48 AM, Robert Metzger <rmetz...@apache.org> wrote:
>> Hi Arnaud,
>>
>> I think that's a bug ;)
>> I'll file a JIRA to fix it for the next release.
>>
>> On Thu, Sep 3, 2015 at 10:26 AM, LINZ, Arnaud
>> <al...@bouyguestelecom.fr>
>> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am wondering why, despite the fact that my java main() methods runs
>>> OK and exit with 0 code value, the Yarn container status set by the
>>> englobing flink execution is FAILED with diagnostic "Flink YARN
>>> Client requested shutdown."?
>>>
>>>
>>>
>>> Command line :
>>>
>>> flink run -m yarn-cluster -yn 20 -ytm 8192 -yqu batch1 -ys 8 --class
>>>   
>>>
>>>
>>>
>>> End of yarn log :
>>>
>>>
>>>
>>> Status of job 6ac47ddc8331ffd0b1fa9a3b5a551f86
>>> (KUBERA-GEO-BRUT2SEGMENT) changed to FINISHED.
>>>
>>> 10:03:00,618 INFO
>>> org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1- Stopping
>>> YARN JobManager with status FAILED and diagnostic Flink YARN Client
>>> requested shutdown.
>>>
>>> 10:03:00,625 INFO
>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
>>> - Waiting for application to be successfully unregistered.
>>>
>>> 10:03:00,874 INFO
>>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolPro
>>> xy  - Closing proxy : h1r2dn12.bpa.bouyguestelecom.fr:45454
>>>
>>> (… more closing proxy …)
>>>
>>> 10:03:00,877 INFO
>>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolPro
>>> xy  - Closing proxy : h1r2dn01.bpa.bouyguestelecom.fr:45454
>>>
>>> 10:03:00,883 INFO
>>> org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1- Stopping
>>> JobManager akka://flink/user/jobmanager#1737010364.
>>>
>>> 10:03:00,895 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator
>>> - Shutting down remote daemon.
>>>
>>> 10:03:00,896 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator
>>> - Remote daemon shut down; proceeding with flushing remote transports.
>>>
>>> 10:03:00,918 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator
>>> - Remoting shut down.
>>>
>>>
>>>
>>> End of log4j log:
>>>
>>>
>>>
>>> 2015:09:03 10:03:00 (main) - INFO -
>>> com.bouygtel.kuberasdk.main.Application.mainMethod - Fin ok
>>> traitement
>>>
>>> 2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode
>>> Inconnue
>>> - Shutting down FlinkYarnCluster from the client shutdown hook
>>>
>>> 2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode
>>> Inconnue
>>> - Sending shutdown request to the Application Master
>>>
>>> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
>>> Classe Inconnue.Methode Inconnue - Sending StopYarnSession request to
>>> ApplicationMaster.
>>>
>>> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
>>> Classe Inconnue.Methode Inconnue - Remote JobManager has been stopped
>>> successfully. Stopping local application client
>>>
>>> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
>>> Classe Inconnue.Methode Inconnue - Stopped Application client.
>>>
>>> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-1

Re:

2015-10-19 Thread Maximilian Michels
You can see the revision number and the build date in the JobManager
log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"

On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org> wrote:
> When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
> If it has been more than a couple of weeks, then I'd advise you to
> update to the latest snapshot version. There has been an issue with
> the calculation of the off-heap memory limit in the past.
>
> Thanks,
> Max
>
> On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>> It's 0.10-SNAPSHOT
>>
>> Gyula
>>
>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt. 19., H,
>> 17:13):
>>>
>>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>>> 0.10-SNAPSHOT?
>>>
>>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <m...@apache.org>
>>> wrote:
>>> > Hi Jakob,
>>> >
>>> > Thanks. Flink allocates its network memory as direct memory outside
>>> > the normal Java heap. By default, that is 64MB but can grow up to
>>> > 128MB on heavy network transfer. How much memory does your machine
>>> > have? Could it be that your upper memory bound is lower than 2048 +
>>> > 128 MB?
>>> >
>>> > Best,
>>> > Max
>>> >
>>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>>> > <jakob.erics...@gmail.com> wrote:
>>> >> Hi,
>>> >>
>>> >> See answers below.
>>> >>
>>> >> /Jakob
>>> >>
>>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <m...@apache.org>
>>> >> wrote:
>>> >>>
>>> >>> Hi Jakob,
>>> >>>
>>> >>> Thank you for reporting the bug. Could you please post your
>>> >>> configuration here? In particular, could you please tell us the value
>>> >>> of the following configuration variables:
>>> >>>
>>> >>> taskmanager.heap.mb
>>> >>
>>> >> taskmanager.heap.mb: 2048
>>> >>>
>>> >>> taskmanager.network.numberOfBuffers
>>> >>
>>> >>
>>> >> Default value. Not changed.
>>> >>
>>> >>>
>>> >>> taskmanager.memory.off-heap
>>> >>>
>>> >> Default value Not changed.
>>> >>
>>> >>>
>>> >>> Are you running the Flink cluster in batch or streaming mode?
>>> >>>
>>> >> Started in streaming mode. Running with two nodes. In the cluster.
>>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to
>>> >> some
>>> >> strange java core dumps in the G1 GC.
>>> >>
>>> >>>
>>> >>> Direct memory is used by Flink's network layer. My guess is that you
>>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>>> >>> direct memory at the moment).
>>> >>>
>>> >>> Thank you,
>>> >>> Max
>>> >>>
>>> >>>
>>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> >>> <jakob.erics...@gmail.com> wrote:
>>> >>> > Hello,
>>> >>> >
>>> >>> > We are running into a strange problem with Direct Memory buffers.
>>> >>> > From
>>> >>> > what
>>> >>> > I know, we are not using any direct memory buffers inside our code.
>>> >>> > This is pretty trivial streaming application just doing some
>>> >>> > dedupliction
>>> >>> > and union some kafka streams.
>>> >>> >
>>> >>> > /Jakob
>>> >>> >
>>> >>> >
>>> >>> >
>>> >>> > 2015-10-19 13:27:59,064 INFO
>>> >>> > org.apache.flink.runtime.taskmanager.Task
>>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED
>>> >>> > with
>>> >>> > exception.
>>> >>> >
>>> >>> >
>

Re: Cannot get Python API to work at all; java.io.FileNotFoundException for Flink python resources

2015-10-21 Thread Maximilian Michels
Hi Ronkainen,

Sorry for the late reply. Unfortunately, this is a bug in the Python
API. I've reproduced the issue and fixed it for the upcoming releases.
The fix will be included in the 0.9.2 and the 0.10 release. If you
don't mind, you could already use the 0.10-SNAPSHOT version (0.10
release will be out next week). Also, the bug is not present in Flink
0.9.0.

Thanks,
Max

On Mon, Oct 19, 2015 at 1:27 PM, Ronkainen Jussi  wrote:
> Hi,
>
>
>
> I’m trying to evaluate the Python API but get a
> java.io.FileNotFoundException when trying to run any Python script on
> pyflink. The contents of the Python script are irrelevant, doesn’t even need
> to be valid Python.
>
> I made a local install of 0.9.1, copy-pasted the example Python script from
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/python.html
> to a file “~/python_src/wordcount.py” and tried to run it in Flink root
> directory (/home/flink/flink-0.9.1/):
>
>
>
> ./bin/pyflink2.sh ~/python_src/wordcount.py
>
>
>
> I get a java.io.FileNotFoundException (trace below). The cause could be
> related to PythonPlanBinder line 93, which strips 7 characters from
> FLINK_DIR and after that appends /resources/python to the result, ending in
> an invalid path. If I understand correctly, for my  user “flink” the path
> should be /home/flink/flink-0.9.1/resources/python
>
>
>
> Am I missing some setup option, using pyflink wrong, or could this be a bug?
> I also tried compiling from 0.10-SNAPSHOT but the problem persists.
>
> I’m running Ubuntu 14.04  with openjdk 8 and Python 2.7.6.
>
>
>
> Exception trace:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> at org.apache.flink.client.program.Client.run(Client.java:315)
>
> at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
>
> Caused by: java.io.FileNotFoundException: File
> /home/flink/flin/resources/python does not exist or the user running Flink
> ('flink') has insufficient permissions to access it.
>
> at
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:107)
>
> at
> org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
>
> at
> org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.prepareFiles(PythonPlanBinder.java:138)
>
> at
> org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.runPlan(PythonPlanBinder.java:108)
>
> at
> org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.main(PythonPlanBinder.java:84)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
> ... 6 more
>
>
>
> -Jussi


Re: Stuck builds on travis

2015-10-09 Thread Maximilian Michels
I think Travis will fix this hickup soon. Maybe you could provide them
with the stuck builds in a mail to supp...@travis-ci.com.

On Fri, Oct 9, 2015 at 3:39 PM, Sachin Goel  wrote:
> Found another one: https://travis-ci.org/apache/flink/jobs/84473635
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Fri, Oct 9, 2015 at 7:06 PM, Sachin Goel 
> wrote:
>>
>> These two builds are stuck on travis. It seems to a travis issue and is
>> limiting the number of concurrent builds to 3.
>> https://travis-ci.org/apache/flink/jobs/84317317
>> https://travis-ci.org/apache/flink/jobs/84405887
>>
>> Perhaps someone from infra should cancel them.
>>
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>
>


Re: Does 'DataStream.writeAsCsv' suppose to work like this?

2015-11-18 Thread Maximilian Michels
Yes, that does make sense! Thank you for explaining. Have you made the
change yet? I couldn't find it on the master.

On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen <se...@apache.org> wrote:
> That makes sense...
>
> On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>>
>> Hey Max,
>>
>> The solution I am proposing is not flushing on every record, but it makes
>> sure to forward the flushing from the sinkfunction to the outputformat
>> whenever it is triggered. Practically this means that the buffering is done
>> (almost) solely in the sink and not in the outputformat any more.
>>
>> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <m...@apache.org>
>> wrote:
>>>
>>> Not sure whether we really want to flush at every invoke call. If you
>>> want to flush every time, you may want to set the update condition to 0
>>> milliseconds. That way, flush will be called every time. In the API this is
>>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>>> performance might degrade.
>>>
>>> By the way, you may also use the RollingFileSink which splits the output
>>> into several files for each hour/week/day. You can then be sure those files
>>> are already completely written to HDFS.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi
>>> <balassi.mar...@gmail.com> wrote:
>>>>
>>>> The problem persists in the current master, simply a format.flush() is
>>>> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>>>>
>>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi
>>>> <balassi.mar...@gmail.com> wrote:
>>>>>
>>>>> Hey Rex,
>>>>>
>>>>> Writing half-baked records is definitely unwanted, thanks for spotting
>>>>> this. Most likely it can be solved by adding a flush at the end of every
>>>>> invoke call, let me check.
>>>>>
>>>>> Best,
>>>>>
>>>>> Marton
>>>>>
>>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote:
>>>>>>
>>>>>> Hi, flinkers!
>>>>>>
>>>>>> I'm new to this whole thing,
>>>>>> and it seems to me that '
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>>>>>> WriteMode, long)' does not work properly.
>>>>>> To be specific, data were not flushed by update frequency when write
>>>>>> to HDFS.
>>>>>>
>>>>>> what make it more disturbing is that, if I check the content with
>>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records.
>>>>>>
>>>>>>
>>>>>> I did a little digging in flink-0.9.1.
>>>>>> And it turns out all
>>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>>>>>> does
>>>>>> is pushing data to
>>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>>>>>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>>>>>
>>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>>>>>> flushed.
>>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat
>>>>>> xxx' might return partial records.
>>>>>>
>>>>>>
>>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>>>>>> somewhere?
>>>>>>
>>>>>>
>>>>>> Best regards and thanks for your time!
>>>>>>
>>>>>> Rex
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Apache Flink Forward Videos

2015-11-13 Thread Maximilian Michels
Hi Welly,

Thanks for sharing! The videos are coming. They soon will all be available.

Cheers,
Max

On Fri, Nov 13, 2015 at 11:08 AM, Welly Tambunan  wrote:
> Hi All,
>
> I've just notice that the video has already available for this one.
>
> http://flink-forward.org/?post_type=session
>
>
> Another weekend gift for all.
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session -nm), flink run without -m fails.

2015-08-26 Thread Maximilian Michels
Nice. More configuration options :)

On Wed, Aug 26, 2015 at 5:58 PM, Robert Metzger rmetz...@apache.org wrote:
 Therefore, my change will include a configuration option to set a custom
 location for the file.

 On Wed, Aug 26, 2015 at 5:55 PM, Maximilian Michels m...@apache.org wrote:

 The only problem with writing the temp is that it will be gone after a
 restart. While this is not important for PIDs because the system has
 been restarted anyways, this can actually be a problem if you want to
 resume a YARN cluster after you have restarted your system.

 On Wed, Aug 26, 2015 at 3:34 PM, Robert Metzger rmetz...@apache.org
 wrote:
  Yep. I think the start-*.sh scripts are also writing the PID to tmp.
 
  On Wed, Aug 26, 2015 at 3:30 PM, Maximilian Michels m...@apache.org
  wrote:
 
  Can't we write the file to the system's temp directory or the user
  home? IMHO this is more standard practice for these type of session
  information.
 
  On Wed, Aug 26, 2015 at 3:25 PM, Robert Metzger rmetz...@apache.org
  wrote:
   Great ;)
  
   Not yet, but you are the second user to request this.
   I think I'll put the file somewhere else now.
  
   On Wed, Aug 26, 2015 at 3:19 PM, LINZ, Arnaud
   al...@bouyguestelecom.fr
   wrote:
  
   Ooops… Seems it was rather a write problem on the conf dir…
  
   Sorry, it works!
  
  
  
   BTW, it’s not really nice to have an application write the
   configuration
   dir ; it’s often a root protected directory in usr/lib/flink. Is
   there
   a
   parameter to put that file elsewhere ?
  
  
  
  
  
   De : Robert Metzger [mailto:rmetz...@apache.org]
   Envoyé : mercredi 26 août 2015 14:42
   À : user@flink.apache.org
   Objet : Re: [0.10-SNAPSHOT ] When naming yarn application
   (yarn-session
   -nm), flink run without -m fails.
  
  
  
   Hi Arnaud,
  
  
  
   usually, you don't have to manually specify the JobManager address
   manually with the -m argument, because it is reading it from the
   conf/.yarn-session.properties file.
  
  
  
   Give me a few minutes to reproduce the issue.
  
  
  
   On Wed, Aug 26, 2015 at 2:39 PM, LINZ, Arnaud
   al...@bouyguestelecom.fr
   wrote:
  
   Hi,
   Using last nightly build, it seems that if you call yarn-session.sh
   with
   -nm option to give a nice application name, then you cannot submit a
   job
   with flink run without specify the ever changing -m jobManager
   address
   since it does not find it any longer.
  
   Regards,
  
   Arnaud
  
  
  
   
  
  
   L'intégrité de ce message n'étant pas assurée sur internet, la
   société
   expéditrice ne peut être tenue responsable de son contenu ni de ses
   pièces
   jointes. Toute utilisation ou diffusion non autorisée est interdite.
   Si
   vous
   n'êtes pas destinataire de ce message, merci de le détruire et
   d'avertir
   l'expéditeur.
  
   The integrity of this message cannot be guaranteed on the Internet.
   The
   company that sent this message cannot therefore be held liable for
   its
   content nor attachments. Any unauthorized use or dissemination is
   prohibited. If you are not the intended recipient of this message,
   then
   please delete it and notify the sender.
  
  
  
  
 
 




Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session -nm), flink run without -m fails.

2015-08-26 Thread Maximilian Michels
Can't we write the file to the system's temp directory or the user
home? IMHO this is more standard practice for these type of session
information.

On Wed, Aug 26, 2015 at 3:25 PM, Robert Metzger rmetz...@apache.org wrote:
 Great ;)

 Not yet, but you are the second user to request this.
 I think I'll put the file somewhere else now.

 On Wed, Aug 26, 2015 at 3:19 PM, LINZ, Arnaud al...@bouyguestelecom.fr
 wrote:

 Ooops… Seems it was rather a write problem on the conf dir…

 Sorry, it works!



 BTW, it’s not really nice to have an application write the configuration
 dir ; it’s often a root protected directory in usr/lib/flink. Is there a
 parameter to put that file elsewhere ?





 De : Robert Metzger [mailto:rmetz...@apache.org]
 Envoyé : mercredi 26 août 2015 14:42
 À : user@flink.apache.org
 Objet : Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session
 -nm), flink run without -m fails.



 Hi Arnaud,



 usually, you don't have to manually specify the JobManager address
 manually with the -m argument, because it is reading it from the
 conf/.yarn-session.properties file.



 Give me a few minutes to reproduce the issue.



 On Wed, Aug 26, 2015 at 2:39 PM, LINZ, Arnaud al...@bouyguestelecom.fr
 wrote:

 Hi,
 Using last nightly build, it seems that if you call yarn-session.sh with
 -nm option to give a nice application name, then you cannot submit a job
 with flink run without specify the ever changing -m jobManager address
 since it does not find it any longer.

 Regards,

 Arnaud



 


 L'intégrité de ce message n'étant pas assurée sur internet, la société
 expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
 jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
 n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
 l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The
 company that sent this message cannot therefore be held liable for its
 content nor attachments. Any unauthorized use or dissemination is
 prohibited. If you are not the intended recipient of this message, then
 please delete it and notify the sender.






Re: Question on flink and hdfs

2015-09-04 Thread Maximilian Michels
Hi Jerry,

If you don't want to use Hadoop, simply pick _any_ Flink version. We
recommend the Hadoop 1 version because it contains the least dependencies,
i.e. you need to download less and the installation occupies less space.
Other than that, it doesn't really matter if you don't use the HDFS
functionality, i.e. you don't access hdfs:// paths.

Cheers,
Max

On Fri, Sep 4, 2015 at 10:08 AM, Welly Tambunan  wrote:

> Hi Jerry,
>
> yes, that's possible. You can download the appropriate version
> https://flink.apache.org/downloads.html
> [image: Inline image 1]
>
> Cheers
>
> On Fri, Sep 4, 2015 at 1:57 AM, Jerry Peng 
> wrote:
>
>> Hello,
>>
>> Does flink require hdfs to run? I know you can use hdfs to checkpoint and
>> process files in a distributed fashion.  So can flink run standalone
>> without hdfs?
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Usage of Hadoop 2.2.0

2015-09-04 Thread Maximilian Michels
+1 for dropping Hadoop 2.2.0 binary and source-compatibility. The
release is hardly used and complicates the important high-availability
changes in Flink.

On Fri, Sep 4, 2015 at 9:33 AM, Stephan Ewen  wrote:
> I am good with that as well. Mind that we are not only dropping a binary
> distribution for Hadoop 2.2.0, but also the source compatibility with 2.2.0.
>
>
>
> Lets also reconfigure Travis to test
>
>  - Hadoop1
>  - Hadoop 2.3
>  - Hadoop 2.4
>  - Hadoop 2.6
>  - Hadoop 2.7
>
>
> On Fri, Sep 4, 2015 at 6:19 AM, Chiwan Park  wrote:
>>
>> +1 for dropping Hadoop 2.2.0
>>
>> Regards,
>> Chiwan Park
>>
>> > On Sep 4, 2015, at 5:58 AM, Ufuk Celebi  wrote:
>> >
>> > +1 to what Robert said.
>> >
>> > On Thursday, September 3, 2015, Robert Metzger 
>> > wrote:
>> > I think most cloud providers moved beyond Hadoop 2.2.0.
>> > Google's Click-To-Deploy is on 2.4.1
>> > AWS EMR is on 2.6.0
>> >
>> > The situation for the distributions seems to be the following:
>> > MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
>> > CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
>> >
>> > HDP 2.0  (October 2013) is using 2.2.0
>> > HDP 2.1 (April 2014) uses 2.4.0 already
>> >
>> > So both vendors and cloud providers are multiple releases away from
>> > Hadoop 2.2.0.
>> >
>> > Spark does not offer a binary distribution lower than 2.3.0.
>> >
>> > In addition to that, I don't think that the HDFS client in 2.2.0 is
>> > really usable in production environments. Users were reporting
>> > ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
>> > sometimes.
>> >
>> > The easiest approach  to resolve this issue would be  (a) dropping the
>> > support for Hadoop 2.2.0
>> > An alternative approach (b) would be:
>> >  - ship a binary version for Hadoop 2.3.0
>> >  - make the source of Flink still compatible with 2.2.0, so that users
>> > can compile a Hadoop 2.2.0 version if needed.
>> >
>> > I would vote for approach (a).
>> >
>> >
>> > On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann 
>> > wrote:
>> > While working on high availability (HA) for Flink's YARN execution I
>> > stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
>> > 2.3.0, Hadoop introduced new functionality which is required for an
>> > efficient HA implementation. Therefore, I was wondering whether there is
>> > actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively 
>> > used
>> > by someone?
>> >
>> > Cheers,
>> > Till
>> >
>>
>>
>>
>>
>>
>


Re: Bug broadcasting objects (serialization issue)

2015-09-03 Thread Maximilian Michels
Thanks for clarifying the "eager serialization". By serializing and
deserializing explicitly (eagerly) we can raise better Exceptions to
notify the user of non-serializable classes.

> BTW: There is an opportunity to fix two problems with one patch: The 
> framesize overflow for the input format, and the serialization.

IMHO this adds another layer of complexity to the job submission
phase. I just had a chat with Robert about this. I wonder, is it
possible to increase the Akka framesize only for the Client
ActorSystem?

On Wed, Sep 2, 2015 at 4:27 PM, Stephan Ewen <se...@apache.org> wrote:
> I see.
>
> Manual serialization implies also manual deserialization (on the workers
> only), which would give a better exception.
>
> BTW: There is an opportunity to fix two problems with one patch: The
> framesize overflow for the input format, and the serialization.
>
> On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Ok but that would not prevent the above error, right? Serializing is
>> not the issue here.
>>
>> Nevertheless, it would catch all errors during initial serialization.
>> Deserializing has its own hazards due to possible Classloader issues.
>>
>> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
>> > Yes, even serialize in the constructor. Then the failure (if
>> > serialization
>> > does not work) comes immediately.
>> >
>> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <m...@apache.org>
>> > wrote:
>> >>
>> >> Nice suggestion. So you want to serialize and deserialize the
>> >> InputFormats
>> >> on the Client to check whether they can be transferred correctly?
>> >> Merely
>> >> serializing is not enough because the above Exception occurs during
>> >> deserialization.
>> >>
>> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <se...@apache.org> wrote:
>> >>>
>> >>> We should try to improve the exception here. More people will run into
>> >>> this issue and the exception should help them understand it well.
>> >>>
>> >>> How about we do eager serialization into a set of byte arrays? Then
>> >>> the
>> >>> serializability issue comes immediately when the program is
>> >>> constructed,
>> >>> rather than later, when it is shipped.
>> >>>
>> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <m...@apache.org>
>> >>> wrote:
>> >>>>
>> >>>> Here's the JIRA issue:
>> >>>> https://issues.apache.org/jira/browse/FLINK-2608
>> >>>>
>> >>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <m...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi Andreas,
>> >>>>>
>> >>>>> Thank you for reporting the problem and including the code to
>> >>>>> reproduce
>> >>>>> the problem. I think there is a problem with the class serialization
>> >>>>> or
>> >>>>> deserialization. Arrays.asList uses a private ArrayList class
>> >>>>> (java.util.Arrays$ArrayList) which is not the one you would normally
>> >>>>> use
>> >>>>> (java.util.ArrayList).
>> >>>>>
>> >>>>> I'll create a JIRA issue to keep track of the problem and to
>> >>>>> investigate further.
>> >>>>>
>> >>>>> Best regards,
>> >>>>> Max
>> >>>>>
>> >>>>> Here's the stack trace:
>> >>>>>
>> >>>>> Exception in thread "main"
>> >>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot
>> >>>>> initialize
>> >>>>> task 'DataSource (at main(Test.java:32)
>> >>>>> (org.apache.flink.api.java.io.CollectionInputFormat))':
>> >>>>> Deserializing the
>> >>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>> >>>>> data
>> >>>>> at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> >>>>> at
>> >>>&g

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the
problem. I think there is a problem with the class serialization or
deserialization. Arrays.asList uses a private ArrayList class
(java.util.Arrays$ArrayList) which is not the one you would normally use
(java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate
further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
task 'DataSource (at main(Test.java:32)
(org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat
([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa 
wrote:

> Hi,
>
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
>
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
>
>
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. 

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Nice suggestion. So you want to serialize and deserialize the InputFormats
on the Client to check whether they can be transferred correctly? Merely
serializing is not enough because the above Exception occurs during
deserialization.

On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <se...@apache.org> wrote:

> We should try to improve the exception here. More people will run into
> this issue and the exception should help them understand it well.
>
> How about we do eager serialization into a set of byte arrays? Then the
> serializability issue comes immediately when the program is constructed,
> rather than later, when it is shipped.
>
> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>
>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Andreas,
>>>
>>> Thank you for reporting the problem and including the code to reproduce
>>> the problem. I think there is a problem with the class serialization or
>>> deserialization. Arrays.asList uses a private ArrayList class
>>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>>> (java.util.ArrayList).
>>>
>>> I'll create a JIRA issue to keep track of the problem and to investigate
>>> further.
>>>
>>> Best regards,
>>> Max
>>>
>>> Here's the stack trace:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>> task 'DataSource (at main(Test.java:32)
>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>>> data
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>> at
>>> org.apach

Re: verbose console

2015-09-02 Thread Maximilian Michels
Hi Michele,

Please supply a log4j.properties file path as a Java VM property like
so: -Dlog4j.configuration=/path/to/log4j.properties

Your IDE should have an option to adjust VM arguments.

Cheers,
Max

On Wed, Sep 2, 2015 at 9:10 AM, Michele Bertoni
 wrote:
> Hi everybody, I just found that in version 0.9.1 it is possibile to disable 
> that verbose console, can you please explain how to do it both in IDE and 
> local environment?
> Especially in IDE I am able to set property of log4j for my logger, but 
> everything I try for flink internal one does not work
>
>
> thanks
> michele


Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Andreas,
>
> Thank you for reporting the problem and including the code to reproduce
> the problem. I think there is a problem with the class serialization or
> deserialization. Arrays.asList uses a private ArrayList class
> (java.util.Arrays$ArrayList) which is not the one you would normally use
> (java.util.ArrayList).
>
> I'll create a JIRA issue to keep track of the problem and to investigate
> further.
>
> Best regards,
> Max
>
> Here's the stack trace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
> task 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
> ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
> ... 26

Re: data flow example on cluster

2015-09-30 Thread Maximilian Michels
Hi Lydia,

Till already pointed you to the documentation. If you want to run the
WordCount example, you can do so by executing the following command:

./bin/flink run -c com.dataartisans.flink.dataflow.examples.WordCount
/path/to/dataflow.jar --input /path/to/input --output /path/to/output

If you want, you can try this command in a local cluster first. You
can start a local cluster by using ./bin/start-local.sh.

Best,
Max

On Wed, Sep 30, 2015 at 10:14 AM, Till Rohrmann  wrote:
> It's described here:
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html#run-example
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2015 at 8:24 AM, Lydia Ickler 
> wrote:
>>
>> Hi all,
>>
>> I want to run the data-flow Wordcount example on a Flink Cluster.
>> The local execution with „mvn exec:exec -Dinput=kinglear.txt
>> -Doutput=wordcounts.txt“ is already working.
>> How is the command to execute it on the cluster?
>>
>> Best regards,
>> Lydia
>
>


Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Hi Robert,

Just a quick update: The issue has been resolved in the latest Maven
0.10-SNAPSHOT dependency.

Cheers,
Max

On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
<ro.schmid...@gmail.com> wrote:
> Hi Max,
>
> thanks for your quick reply. I found the relevant code and commented it out
> for testing, seems to be working. Happily waiting for the fix. Thanks again.
>
> Robert
>
> On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Robert,
>>
>> This is a regression on the current master due to changes in the way
>> Flink calculates the memory and sets the maximum direct memory size.
>> We introduced these changes when we merged support for off-heap
>> memory. This is not a problem in the way Flink deals with managed
>> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> maximum direct memory is only used by the network stack. The network
>> library we use, allocates more direct memory than we expected.
>>
>> We'll push a fix to the master as soon as possible. Thank you for
>> reporting and thanks for your patience.
>>
>> Best regards,
>> Max
>>
>> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> <ro.schmid...@gmail.com> wrote:
>> > Hi everyone,
>> >
>> > I'm constantly running into OutOfMemoryErrors and for the life of me I
>> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> > the
>> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> > unfinished implementation of TPC-H Q2
>> >
>> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
>> > per
>> > machine. This is what I believe to be the relevant section of my
>> > yarn_site.xml:
>> >
>> >
>> > 
>> > yarn.nodemanager.resource.memory-mb
>> > 57344
>> >   
>> > 
>> >   
>> > yarn.scheduler.maximum-allocation-mb
>> > 55296
>> >   
>> >
>> >   
>> > yarn.nodemanager.vmem-check-enabled
>> > false
>> >   
>> >
>> >
>> > And this is how I submit the job:
>> >
>> >
>> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> > .
>> >
>> >
>> > The TMs happily report:
>> >
>> > .
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -  JVM Options:
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xms24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xmx24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -XX:MaxDirectMemorySize=65m
>> > .
>> >
>> >
>> > I've tried various combinations of YARN and Flink options, to no avail.
>> > I
>> > always end up with the following stacktrace:
>> >
>> >
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> &

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies:  mvn -U clean install -DskipTests

On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
<ro.schmid...@gmail.com> wrote:
> Sweet! I'll pull it straight away. Thanks!
>
> On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Robert,
>>
>> Just a quick update: The issue has been resolved in the latest Maven
>> 0.10-SNAPSHOT dependency.
>>
>> Cheers,
>> Max
>>
>> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> <ro.schmid...@gmail.com> wrote:
>> > Hi Max,
>> >
>> > thanks for your quick reply. I found the relevant code and commented it
>> > out
>> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> > again.
>> >
>> > Robert
>> >
>> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <m...@apache.org>
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> This is a regression on the current master due to changes in the way
>> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> We introduced these changes when we merged support for off-heap
>> >> memory. This is not a problem in the way Flink deals with managed
>> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> maximum direct memory is only used by the network stack. The network
>> >> library we use, allocates more direct memory than we expected.
>> >>
>> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> reporting and thanks for your patience.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> <ro.schmid...@gmail.com> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I'm constantly running into OutOfMemoryErrors and for the life of me
>> >> > I
>> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> >> > the
>> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> >> > unfinished implementation of TPC-H Q2
>> >> >
>> >> >
>> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> > memory
>> >> > per
>> >> > machine. This is what I believe to be the relevant section of my
>> >> > yarn_site.xml:
>> >> >
>> >> >
>> >> > 
>> >> > yarn.nodemanager.resource.memory-mb
>> >> > 57344
>> >> >   
>> >> > 
>> >> >   
>> >> > yarn.scheduler.maximum-allocation-mb
>> >> > 55296
>> >> >   
>> >> >
>> >> >   
>> >> > yarn.nodemanager.vmem-check-enabled
>> >> > false
>> >> >   
>> >> >
>> >> >
>> >> > And this is how I submit the job:
>> >> >
>> >> >
>> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> >> > .
>> >> >
>> >> >
>> >> > The TMs happily report:
>> >> >
>> >> > .
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -  JVM Options:
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xms24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xmx24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -XX:MaxDirectMemorySize=65m
>> >> > .
>> >> >
>> >> >
>> >> > I've tried various combinations of YARN and Flink options, to no
>> >> > avail.
>> >> > I
>> >> > always end up with the following stacktrace:
>> >> >
>> >> &

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Great to hear :)

On Thu, Oct 1, 2015 at 11:21 AM, Robert Schmidtke
<ro.schmid...@gmail.com> wrote:
> I pulled the current master branch and rebuilt Flink completely anyway.
> Works like a charm.
>
> On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <m...@apache.org> wrote:
>>
>> By the way, you might have to use the "-U" flag to force Maven to
>> update its dependencies:  mvn -U clean install -DskipTests
>>
>> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
>> <ro.schmid...@gmail.com> wrote:
>> > Sweet! I'll pull it straight away. Thanks!
>> >
>> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <m...@apache.org>
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> Just a quick update: The issue has been resolved in the latest Maven
>> >> 0.10-SNAPSHOT dependency.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> >> <ro.schmid...@gmail.com> wrote:
>> >> > Hi Max,
>> >> >
>> >> > thanks for your quick reply. I found the relevant code and commented
>> >> > it
>> >> > out
>> >> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> >> > again.
>> >> >
>> >> > Robert
>> >> >
>> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <m...@apache.org>
>> >> > wrote:
>> >> >>
>> >> >> Hi Robert,
>> >> >>
>> >> >> This is a regression on the current master due to changes in the way
>> >> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> >> We introduced these changes when we merged support for off-heap
>> >> >> memory. This is not a problem in the way Flink deals with managed
>> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> >> maximum direct memory is only used by the network stack. The network
>> >> >> library we use, allocates more direct memory than we expected.
>> >> >>
>> >> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> >> reporting and thanks for your patience.
>> >> >>
>> >> >> Best regards,
>> >> >> Max
>> >> >>
>> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> >> <ro.schmid...@gmail.com> wrote:
>> >> >> > Hi everyone,
>> >> >> >
>> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of
>> >> >> > me
>> >> >> > I
>> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm
>> >> >> > running
>> >> >> > the
>> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is
>> >> >> > an
>> >> >> > unfinished implementation of TPC-H Q2
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> >> > memory
>> >> >> > per
>> >> >> > machine. This is what I believe to be the relevant section of my
>> >> >> > yarn_site.xml:
>> >> >> >
>> >> >> >
>> >> >> > 
>> >> >> > yarn.nodemanager.resource.memory-mb
>> >> >> > 57344
>> >> >> >   
>> >> >> > 
>> >> >> >   
>> >> >> > yarn.scheduler.maximum-allocation-mb
>> >> >> > 55296
>> >> >> >   
>> >> >> >
>> >> >> >   
>> >> >> > yarn.nodemanager.vmem-check-enabled
>> >> >> > false
>> >> >> >   
>> >> >> >
>> >> >> >
>> >> >> > And this is how I submit the job:
>> >> >> >

Re: Running Flink on an Amazon Elastic MapReduce cluster

2015-10-05 Thread Maximilian Michels
Hi Hanen,

It appears that the environment variables are not set. Thus, Flink cannot
pick up the Hadoop configuration. Could you please paste the output of
"echo $HADOOP_HOME" and "echo $HADOOP_CONF_DIR" here?

In any case, your problem looks similar to the one discussed here:
http://stackoverflow.com/questions/31991934/cannot-use-apache-flink-in-amazon-emr
Please execute

export HADOOP_CONF_DIR=/etc/hadoop/conf

and you should be good to go.

Cheers,
Max

On Mon, Oct 5, 2015 at 3:37 PM, Hanen Borchani  wrote:

> Hi all,
>
> I tried to start a Yarn session on an Amazon EMR cluster with Hadoop 2.6.0
> following the instructions provided in this link and using Flink 0.9.1 for
> Hadoop 2.6.0
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html
>
> Running the following command line: ./bin/yarn-session.sh -n 2 -jm 1024
> -tm 2048 generated the following error message
>
>  
>
> 12:53:47,633 INFO  org.apache.hadoop.yarn.client.RMProxy
>- Connecting to ResourceManager at /0.0.0.0:8032
>
> 12:53:47,805 WARN
> org.apache.hadoop.util.NativeCodeLoader   - Unable to
> load native-hadoop library for your platform... using builtin-java classes
> where applicable
>
> 12:53:48,226 WARN
> org.apache.flink.yarn.FlinkYarnClient - Neither the
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink
> YARN Client needs one of these to be set to properly load the Hadoop
> configuration for accessing YARN.
>
> 12:53:48,227 INFO
> org.apache.flink.yarn.FlinkYarnClient - Using
> values:
>
> 12:53:48,228 INFO
> org.apache.flink.yarn.FlinkYarnClient - TaskManager
> count = 2
>
> 12:53:48,229 INFO
> org.apache.flink.yarn.FlinkYarnClient - JobManager
> memory = 1024
>
> 12:53:48,229 INFO
> org.apache.flink.yarn.FlinkYarnClient - TaskManager
> memory = 2048
>
> 12:53:48,580 WARN  org.apache.flink.yarn.FlinkYarnClient
>   - The file system scheme is 'file'. This indicates that the
> specified Hadoop configuration path is wrong and the sytem is using the
> default Hadoop configuration values.The Flink YARN client needs to store
> its files in a distributed file system
>
> 12:53:48,593 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/lib/flink-dist-0.9.1.jar to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-dist-0.9.1.jar
>
> 12:53:49,245 INFO
> org.apache.flink.yarn.Utils   - Copying
> from /home/hadoop/flink-0.9.1/conf/flink-conf.yaml to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
>
> 12:53:49,251 INFO  org.apache.flink.yarn.Utils
>- Copying from
> file:/home/hadoop/flink-0.9.1/lib/flink-python-0.9.1.jar to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-python-0.9.1.jar
>
> 12:53:49,278 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/conf/logback.xml to
> file:/home/hadoop/.flink/application_1444046049303_0008/logback.xml
>
> 12:53:49,285 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/conf/log4j.properties to
> file:/home/hadoop/.flink/application_1444046049303_0008/log4j.properties
>
> 12:53:49,304 INFO
> org.apache.flink.yarn.FlinkYarnClient - Submitting
> application master application_1444046049303_0008
>
> 12:53:49,347 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1444046049303_0008
>
> 12:53:49,347 INFO
> org.apache.flink.yarn.FlinkYarnClient - Waiting for
> the cluster to be allocated
>
> 12:53:49,349 INFO
> org.apache.flink.yarn.FlinkYarnClient - Deploying
> cluster, current state ACCEPTED
>
> 12:53:50,351 INFO
> org.apache.flink.yarn.FlinkYarnClient - Deploying
> cluster, current state ACCEPTED
>
> Error while deploying YARN cluster: The YARN application unexpectedly
> switched to state FAILED during deployment.
>
> Diagnostics from YARN: Application application_1444046049303_0008 failed 1
> times due to AM Container for appattempt_1444046049303_0008_01 exited
> with  exitCode: -1000
>
> For more detailed output, check application tracking page:http://
> ip-172-31-10-16.us-west-2.compute.internal:20888/proxy/application_1444046049303_0008/Then,
> click on links to logs of each attempt.
>
> Diagnostics: File
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
> does not exist
>
> java.io.FileNotFoundException: File
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
> does not exist
>
>  at
> 

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
I mentioned that the exception gets thrown when requesting container
status information. We need this to send a heartbeat to YARN but it is
not very crucial if this fails once for the running job. Possibly, we
could work around this problem by retrying N times in case of an
exception.

Would it be possible for you to deploy a custom Flink 0.10.1 version
we provide and test again?

On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes <ni...@basjes.nl> wrote:
> No, I was just asking.
> No upgrade is possible for the next month or two.
>
> This week is our busiest day of the year ...
> Our shop is doing about 10 orders per second these days ...
>
> So they won't upgrade until next January/February
>
> Niels
>
> On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Niels,
>>
>> You mentioned you have the option to update Hadoop and redeploy the
>> job. Would be great if you could do that and let us know how it turns
>> out.
>>
>> Cheers,
>> Max
>>
>> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes <ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> > I posted the entire log from the first log line at the moment of failure
>> > to
>> > the very end of the logfile.
>> > This is all I have.
>> >
>> > As far as I understand the Kerberos and Keytab mechanism in Hadoop Yarn
>> > is
>> > that it catches the "Invalid Token" and then (if keytab) gets a new
>> > Kerberos
>> > ticket (or tgt?).
>> > When the new ticket has been obtained it retries the call that
>> > previously
>> > failed.
>> > To me it seemed that this call can fail over the invalid Token yet it
>> > cannot
>> > be retried.
>> >
>> > At this moment I'm thinking a bug in Hadoop.
>> >
>> > Niels
>> >
>> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels <m...@apache.org>
>> > wrote:
>> >>
>> >> Hi Niels,
>> >>
>> >> Sorry for hear you experienced this exception. From a first glance, it
>> >> looks like a bug in Hadoop to me.
>> >>
>> >> > "Not retrying because the invoked method is not idempotent, and
>> >> > unable
>> >> > to determine whether it was invoked"
>> >>
>> >> That is nothing to worry about. This is Hadoop's internal retry
>> >> mechanism that re-attempts to do actions which previously failed if
>> >> that's possible. Since the action is not idempotent (it cannot be
>> >> executed again without risking to change the state of the execution)
>> >> and it also doesn't track its execution states, it won't be retried
>> >> again.
>> >>
>> >> The main issue is this exception:
>> >>
>> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> >> > AMRMToken from >appattempt_1443166961758_163901_01
>> >>
>> >> From the stack trace it is clear that this exception occurs upon
>> >> requesting container status information from the Resource Manager:
>> >>
>> >> >at
>> >> >
>> >> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>> >>
>> >> Are there any more exceptions in the log? Do you have the complete
>> >> logs available and could you share them?
>> >>
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> >> > Hi,
>> >> >
>> >> >
>> >> > We have a Kerberos secured Yarn cluster here and I'm experimenting
>> >> > with
>> >> > Apache Flink on top of that.
>> >> >
>> >> > A few days ago I started a very simple Flink application (just stream
>> >> > the
>> >> > time as a String into HBase 10 times per second).
>> >> >
>> >> > I (deliberately) asked our IT-ops guys to make my account have a max
>> >> > ticket
>> >> > time of 5 minutes and a max renew time of 10 minutes (yes,
>> >> > ridiculously
>> >> > low
>> >> > timeout values because I needed to validate this
>> >> > https://issues.apache.org/jira/browse/FLINK-2977).
>> >> >
>> >> > Thi

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
Hi Niels,

You mentioned you have the option to update Hadoop and redeploy the
job. Would be great if you could do that and let us know how it turns
out.

Cheers,
Max

On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes <ni...@basjes.nl> wrote:
> Hi,
>
> I posted the entire log from the first log line at the moment of failure to
> the very end of the logfile.
> This is all I have.
>
> As far as I understand the Kerberos and Keytab mechanism in Hadoop Yarn is
> that it catches the "Invalid Token" and then (if keytab) gets a new Kerberos
> ticket (or tgt?).
> When the new ticket has been obtained it retries the call that previously
> failed.
> To me it seemed that this call can fail over the invalid Token yet it cannot
> be retried.
>
> At this moment I'm thinking a bug in Hadoop.
>
> Niels
>
> On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Niels,
>>
>> Sorry for hear you experienced this exception. From a first glance, it
>> looks like a bug in Hadoop to me.
>>
>> > "Not retrying because the invoked method is not idempotent, and unable
>> > to determine whether it was invoked"
>>
>> That is nothing to worry about. This is Hadoop's internal retry
>> mechanism that re-attempts to do actions which previously failed if
>> that's possible. Since the action is not idempotent (it cannot be
>> executed again without risking to change the state of the execution)
>> and it also doesn't track its execution states, it won't be retried
>> again.
>>
>> The main issue is this exception:
>>
>> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> > AMRMToken from >appattempt_1443166961758_163901_01
>>
>> From the stack trace it is clear that this exception occurs upon
>> requesting container status information from the Resource Manager:
>>
>> >at
>> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>>
>> Are there any more exceptions in the log? Do you have the complete
>> logs available and could you share them?
>>
>>
>> Best regards,
>> Max
>>
>> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> >
>> > We have a Kerberos secured Yarn cluster here and I'm experimenting with
>> > Apache Flink on top of that.
>> >
>> > A few days ago I started a very simple Flink application (just stream
>> > the
>> > time as a String into HBase 10 times per second).
>> >
>> > I (deliberately) asked our IT-ops guys to make my account have a max
>> > ticket
>> > time of 5 minutes and a max renew time of 10 minutes (yes, ridiculously
>> > low
>> > timeout values because I needed to validate this
>> > https://issues.apache.org/jira/browse/FLINK-2977).
>> >
>> > This job is started with a keytab file and after running for 31 hours it
>> > suddenly failed with the exception you see below.
>> >
>> > I had the same job running for almost 400 hours until that failed too (I
>> > was
>> > too late to check the logfiles but I suspect the same problem).
>> >
>> >
>> > So in that time span my tickets have expired and new tickets have been
>> > obtained several hundred times.
>> >
>> >
>> > The main error I see is that in the process of a ticket expiring and
>> > being
>> > renewed I see this message:
>> >
>> >  Not retrying because the invoked method is not idempotent, and
>> > unable
>> > to determine whether it was invoked
>> >
>> >
>> > Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
>> >
>> > Flink is version 0.10.1
>> >
>> >
>> > How do I fix this?
>> > Is this a bug (in either Hadoop or Flink) or am I doing something wrong?
>> > Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
>> >
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > 21:30:27,821 WARN  org.apache.hadoop.security.UserGroupInformation
>> > - PriviledgedActionException as:nbasjes (auth:SIMPLE)
>> >
>> > cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> > Invalid AMRMToken from appattempt_1443166961758_163901_01
>> > 21:30:27,861 WARN  org.apache.hadoop.ipc.Client
>> > - Exception encoun

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
Great. Here is the commit to try out:
https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3

If you already have the Flink repository, check it out using

git fetch https://github.com/mxm/flink/
f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD

Alternatively, here's a direct download link to the sources with the
fix included:
https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip

Thanks a lot,
Max

On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes <ni...@basjes.nl> wrote:
> Sure, just give me the git repo url to build and I'll give it a try.
>
> Niels
>
> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> I mentioned that the exception gets thrown when requesting container
>> status information. We need this to send a heartbeat to YARN but it is
>> not very crucial if this fails once for the running job. Possibly, we
>> could work around this problem by retrying N times in case of an
>> exception.
>>
>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>> we provide and test again?
>>
>> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes <ni...@basjes.nl> wrote:
>> > No, I was just asking.
>> > No upgrade is possible for the next month or two.
>> >
>> > This week is our busiest day of the year ...
>> > Our shop is doing about 10 orders per second these days ...
>> >
>> > So they won't upgrade until next January/February
>> >
>> > Niels
>> >
>> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels <m...@apache.org>
>> > wrote:
>> >>
>> >> Hi Niels,
>> >>
>> >> You mentioned you have the option to update Hadoop and redeploy the
>> >> job. Would be great if you could do that and let us know how it turns
>> >> out.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes <ni...@basjes.nl> wrote:
>> >> > Hi,
>> >> >
>> >> > I posted the entire log from the first log line at the moment of
>> >> > failure
>> >> > to
>> >> > the very end of the logfile.
>> >> > This is all I have.
>> >> >
>> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
>> >> > Yarn
>> >> > is
>> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
>> >> > Kerberos
>> >> > ticket (or tgt?).
>> >> > When the new ticket has been obtained it retries the call that
>> >> > previously
>> >> > failed.
>> >> > To me it seemed that this call can fail over the invalid Token yet it
>> >> > cannot
>> >> > be retried.
>> >> >
>> >> > At this moment I'm thinking a bug in Hadoop.
>> >> >
>> >> > Niels
>> >> >
>> >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels <m...@apache.org>
>> >> > wrote:
>> >> >>
>> >> >> Hi Niels,
>> >> >>
>> >> >> Sorry for hear you experienced this exception. From a first glance,
>> >> >> it
>> >> >> looks like a bug in Hadoop to me.
>> >> >>
>> >> >> > "Not retrying because the invoked method is not idempotent, and
>> >> >> > unable
>> >> >> > to determine whether it was invoked"
>> >> >>
>> >> >> That is nothing to worry about. This is Hadoop's internal retry
>> >> >> mechanism that re-attempts to do actions which previously failed if
>> >> >> that's possible. Since the action is not idempotent (it cannot be
>> >> >> executed again without risking to change the state of the execution)
>> >> >> and it also doesn't track its execution states, it won't be retried
>> >> >> again.
>> >> >>
>> >> >> The main issue is this exception:
>> >> >>
>> >> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken:
>> >> >> > Invalid
>> >> >> > AMRMToken from >appattempt_1443166961758_163901_01
>> >> >>
>> >> >> From the stack trace it is clear that this exception occurs upon
>> >> >> requesting co

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
I forgot you're using Flink 0.10.1. The above was for the master.

So here's the commit for Flink 0.10.1:
https://github.com/mxm/flink/commit/a41f3866f4097586a7b2262093088861b62930cd

git fetch https://github.com/mxm/flink/ \
a41f3866f4097586a7b2262093088861b62930cd && git checkout FETCH_HEAD

https://github.com/mxm/flink/archive/a41f3866f4097586a7b2262093088861b62930cd.zip

Thanks,
Max

On Wed, Dec 2, 2015 at 6:39 PM, Maximilian Michels <m...@apache.org> wrote:
> Great. Here is the commit to try out:
> https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3
>
> If you already have the Flink repository, check it out using
>
> git fetch https://github.com/mxm/flink/
> f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD
>
> Alternatively, here's a direct download link to the sources with the
> fix included:
> https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip
>
> Thanks a lot,
> Max
>
> On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes <ni...@basjes.nl> wrote:
>> Sure, just give me the git repo url to build and I'll give it a try.
>>
>> Niels
>>
>> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels <m...@apache.org> wrote:
>>>
>>> I mentioned that the exception gets thrown when requesting container
>>> status information. We need this to send a heartbeat to YARN but it is
>>> not very crucial if this fails once for the running job. Possibly, we
>>> could work around this problem by retrying N times in case of an
>>> exception.
>>>
>>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>>> we provide and test again?
>>>
>>> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>> > No, I was just asking.
>>> > No upgrade is possible for the next month or two.
>>> >
>>> > This week is our busiest day of the year ...
>>> > Our shop is doing about 10 orders per second these days ...
>>> >
>>> > So they won't upgrade until next January/February
>>> >
>>> > Niels
>>> >
>>> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels <m...@apache.org>
>>> > wrote:
>>> >>
>>> >> Hi Niels,
>>> >>
>>> >> You mentioned you have the option to update Hadoop and redeploy the
>>> >> job. Would be great if you could do that and let us know how it turns
>>> >> out.
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > I posted the entire log from the first log line at the moment of
>>> >> > failure
>>> >> > to
>>> >> > the very end of the logfile.
>>> >> > This is all I have.
>>> >> >
>>> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
>>> >> > Yarn
>>> >> > is
>>> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
>>> >> > Kerberos
>>> >> > ticket (or tgt?).
>>> >> > When the new ticket has been obtained it retries the call that
>>> >> > previously
>>> >> > failed.
>>> >> > To me it seemed that this call can fail over the invalid Token yet it
>>> >> > cannot
>>> >> > be retried.
>>> >> >
>>> >> > At this moment I'm thinking a bug in Hadoop.
>>> >> >
>>> >> > Niels
>>> >> >
>>> >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels <m...@apache.org>
>>> >> > wrote:
>>> >> >>
>>> >> >> Hi Niels,
>>> >> >>
>>> >> >> Sorry for hear you experienced this exception. From a first glance,
>>> >> >> it
>>> >> >> looks like a bug in Hadoop to me.
>>> >> >>
>>> >> >> > "Not retrying because the invoked method is not idempotent, and
>>> >> >> > unable
>>> >> >> > to determine whether it was invoked"
>>> >> >>
>>> >> >> That is nothing to worry about. This is Hadoop's internal retry
>>> >> >>

Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-02 Thread Maximilian Michels
Hi Welly,

We still have to decide on the next release date but I would expect
Flink 0.10.2 within the next weeks. If you can't work around the union
limitation, you may build your own Flink either from the master or the
release-0.10 branch which will eventually be Flink 0.10.2.

Cheers,
Max

On Tue, Dec 1, 2015 at 12:04 PM, Welly Tambunan  wrote:
> Thanks a lot Aljoscha.
>
> When it will be released ?
>
> Cheers
>
> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
> wrote:
>>
>> Hi,
>> I relaxed the restrictions on union. This should make it into an upcoming
>> 0.10.2 bugfix release.
>>
>> Cheers,
>> Aljoscha
>> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
>> >
>> > Hi All,
>> >
>> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
>> > have this following error.
>> >
>> > Exception in thread "main" java.lang.UnsupportedOperationException: A
>> > DataStream cannot be unioned with itself
>> >
>> > Then i find the relevant JIRA for this one.
>> > https://issues.apache.org/jira/browse/FLINK-3080
>> >
>> > Is there any plan which release this will be ?
>> >
>> >
>> > Another issue i have after upgrading is can't union with different level
>> > of parallelism.
>> >
>> > I think we will need to fall back to 0.9 again for the time being.
>> >
>> > Cheers
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Including option for starting job and task managers in the foreground

2015-12-02 Thread Maximilian Michels
Hi Brian,

I don't recall Docker requires commands to run in the foreground. Still, if
that is your requirement, simply remove the "&" at the end of this line in
flink-daemon.sh:

$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath
"`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &

Cheers,
Max

On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann  wrote:

> Hi Brian,
>
> as far as I know this is at the moment not possible with our scripts.
> However it should be relatively easy to add by simply executing the Java
> command in flink-daemon.sh in the foreground. Do you want to add this?
>
> Cheers,
> Till
> On Dec 1, 2015 9:40 PM, "Brian Chhun" 
> wrote:
>
>> Hi All,
>>
>> Is it possible to include a command line flag for starting job and task
>> managers in the foreground? Currently, `bin/jobmanager.sh` and
>> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
>> things in the background. I'd like to execute these commands inside a
>> docker container, but it's expected that the process is running in the
>> foreground. I think it might be useful to have it run in the foreground so
>> that it can be hooked into some process supervisors. Any suggestions are
>> appreciated.
>>
>>
>> Thanks,
>> Brian
>>
>


Re: Including option for starting job and task managers in the foreground

2015-12-03 Thread Maximilian Michels
I think the way supervisor is used in the Docker scripts is a bit hacky. It
is simply started in the foreground and does nothing. Supervisor is
actually a really nice utility to start processes in Docker containers and
monitor them.

Nevertheless, supervisor also expects commands to stay in the foreground. A
common way to work around this, is to create a script which monitors the
daemon process' pid. Thinking about this, I think we could actually add the
foreground functionality directly in the jobmanager / taskmanager shell
script like you suggested.

In the meantime, you could also use a simple script like this:

#!/usr/bin/env bash
# daemonize job manager
./bin/jobmanager start cluster
# wait until process goes down
wait $!

Cheers,
Max

On Wed, Dec 2, 2015 at 7:16 PM, Brian Chhun <brian.ch...@getbraintree.com>
wrote:

> Thanks, I'm basing the things I'm doing based on what I see there. One
> thing that's not clear to me in that example is why supervisor is used to
> keep the container alive, rather than using some simpler means. It doesn't
> look like it's been configured to supervise anything.
>
> On Wed, Dec 2, 2015 at 11:44 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Have you looked at
>> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>> ? This demonstrates how to use Flink with Docker. In particular it
>> states: "Images [..] run Supervisor to stay alive when running
>> containers."
>>
>> Have a look at flink/config-flink.sh.
>>
>> Cheers,
>> Max
>>
>> On Wed, Dec 2, 2015 at 6:29 PM, Brian Chhun
>> <brian.ch...@getbraintree.com> wrote:
>> > Yep, I think this makes sense. I'm currently patching the
>> flink-daemon.sh
>> > script to remove the `&`, but I don't think it's a very robust solution,
>> > particularly when this script changes across versions of Flink. I'm
>> very new
>> > to Docker, but the resources I've found indicates that the process must
>> run
>> > in the foreground, though people seem to get around it with some hacks.
>> >
>> > When I have some time, I can look into refactoring some parts of the
>> scripts
>> > so that it can be started in the foreground.
>> >
>> > Thanks,
>> > Brian
>> >
>> > On Wed, Dec 2, 2015 at 3:22 AM, Maximilian Michels <m...@apache.org>
>> wrote:
>> >>
>> >> Hi Brian,
>> >>
>> >> I don't recall Docker requires commands to run in the foreground.
>> Still,
>> >> if that is your requirement, simply remove the "&" at the end of this
>> line
>> >> in flink-daemon.sh:
>> >>
>> >> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
>> -classpath
>> >> "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
>> >> ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann <trohrm...@apache.org>
>> >> wrote:
>> >>>
>> >>> Hi Brian,
>> >>>
>> >>> as far as I know this is at the moment not possible with our scripts.
>> >>> However it should be relatively easy to add by simply executing the
>> Java
>> >>> command in flink-daemon.sh in the foreground. Do you want to add this?
>> >>>
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Dec 1, 2015 9:40 PM, "Brian Chhun" <brian.ch...@getbraintree.com>
>> >>> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>>
>> >>>> Is it possible to include a command line flag for starting job and
>> task
>> >>>> managers in the foreground? Currently, `bin/jobmanager.sh` and
>> >>>> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts
>> these
>> >>>> things in the background. I'd like to execute these commands inside
>> a docker
>> >>>> container, but it's expected that the process is running in the
>> foreground.
>> >>>> I think it might be useful to have it run in the foreground so that
>> it can
>> >>>> be hooked into some process supervisors. Any suggestions are
>> appreciated.
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Brian
>> >>
>> >>
>> >
>>
>
>


Re: Flink job on secure Yarn fails after many hours

2015-12-03 Thread Maximilian Michels
Hi Niels,

Just got back from our CI. The build above would fail with a
Checkstyle error. I corrected that. Also I have built the binaries for
your Hadoop version 2.6.0.

Binaries:

https://drive.google.com/file/d/0BziY9U_qva1sZ1FVR3RWeVNrNzA/view?usp=sharing

Source:

https://github.com/mxm/flink/tree/kerberos-yarn-heartbeat-fail-0.10.1

git fetch https://github.com/mxm/flink/ \
kerberos-yarn-heartbeat-fail-0.10.1 && git checkout FETCH_HEAD

https://github.com/mxm/flink/archive/kerberos-yarn-heartbeat-fail-0.10.1.zip

Thanks,
Max

On Wed, Dec 2, 2015 at 6:52 PM, Maximilian Michels <m...@apache.org> wrote:
> I forgot you're using Flink 0.10.1. The above was for the master.
>
> So here's the commit for Flink 0.10.1:
> https://github.com/mxm/flink/commit/a41f3866f4097586a7b2262093088861b62930cd
>
> git fetch https://github.com/mxm/flink/ \
> a41f3866f4097586a7b2262093088861b62930cd && git checkout FETCH_HEAD
>
> https://github.com/mxm/flink/archive/a41f3866f4097586a7b2262093088861b62930cd.zip
>
> Thanks,
> Max
>
> On Wed, Dec 2, 2015 at 6:39 PM, Maximilian Michels <m...@apache.org> wrote:
>> Great. Here is the commit to try out:
>> https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3
>>
>> If you already have the Flink repository, check it out using
>>
>> git fetch https://github.com/mxm/flink/
>> f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD
>>
>> Alternatively, here's a direct download link to the sources with the
>> fix included:
>> https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip
>>
>> Thanks a lot,
>> Max
>>
>> On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>> Sure, just give me the git repo url to build and I'll give it a try.
>>>
>>> Niels
>>>
>>> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels <m...@apache.org> wrote:
>>>>
>>>> I mentioned that the exception gets thrown when requesting container
>>>> status information. We need this to send a heartbeat to YARN but it is
>>>> not very crucial if this fails once for the running job. Possibly, we
>>>> could work around this problem by retrying N times in case of an
>>>> exception.
>>>>
>>>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>>>> we provide and test again?
>>>>
>>>> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>>> > No, I was just asking.
>>>> > No upgrade is possible for the next month or two.
>>>> >
>>>> > This week is our busiest day of the year ...
>>>> > Our shop is doing about 10 orders per second these days ...
>>>> >
>>>> > So they won't upgrade until next January/February
>>>> >
>>>> > Niels
>>>> >
>>>> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels <m...@apache.org>
>>>> > wrote:
>>>> >>
>>>> >> Hi Niels,
>>>> >>
>>>> >> You mentioned you have the option to update Hadoop and redeploy the
>>>> >> job. Would be great if you could do that and let us know how it turns
>>>> >> out.
>>>> >>
>>>> >> Cheers,
>>>> >> Max
>>>> >>
>>>> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > I posted the entire log from the first log line at the moment of
>>>> >> > failure
>>>> >> > to
>>>> >> > the very end of the logfile.
>>>> >> > This is all I have.
>>>> >> >
>>>> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
>>>> >> > Yarn
>>>> >> > is
>>>> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
>>>> >> > Kerberos
>>>> >> > ticket (or tgt?).
>>>> >> > When the new ticket has been obtained it retries the call that
>>>> >> > previously
>>>> >> > failed.
>>>> >> > To me it seemed that this call can fail over the invalid Token yet it
>>>> >> > cannot
>>>> >> > be retried.
>>>> >> >
>>>> &

Re: Flink Storm

2015-12-04 Thread Maximilian Michels
Hi Naveen,

Were you using Maven before? The syncing of changes in the master
always takes a while for Maven. The documentation happened to be
updated before Maven synchronized. Building and installing manually
(what you did) solves the problem.

Strangely, when I run your code on my machine with the latest
1.0-SNAPSHOT I see a lot of output on my console.

Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89

Could you add bolt which writes the Storm tuples to a file? Is that
file also empty?

builder.setBolt("file", new BoltFileSink("/tmp/storm", new OutputFormatter() {
   @Override
   public String format(Tuple tuple) {
  return tuple.toString();
   }
}), 1).shuffleGrouping("count");


Thanks,
Max


Re: Documentation for Fold

2015-12-04 Thread Maximilian Michels
Thanks Welly!

We have already corrected that in the snapshot documentation at
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations

I fixed it also for the 0.10 documentation.

Best,
Max

On Fri, Dec 4, 2015 at 6:24 AM, Welly Tambunan  wrote:
>
> Hi All,
>
> Currently i'm going through the documentation for DataStream here and minor 
> error in the docs. I thought i should inform you.
>
> I think fold only works for keyed data stream.
>
>
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
Hi Madhu,

Not yet. The API has changed slightly. We'll add one very soon. In the
meantime I've created an issue to keep track of the status:

https://issues.apache.org/jira/browse/FLINK-3115

Thanks,
Max

On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
 wrote:
> is current elasticsearch-flink connector support elasticsearch 2.x version?
>
> -Madhu


Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
{
> throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.info("Connected to nodes: " + nodes.toString());
> }
> }
> client = transportClient;
>
> BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
> client,
> new BulkProcessor.Listener() {
> public void beforeBulk(long executionId,
>BulkRequest request) {
>
> }
>
> public void afterBulk(long executionId,
>   BulkRequest request,
>   BulkResponse response) {
> if (response.hasFailures()) {
> for (BulkItemResponse itemResp :
> response.getItems()) {
> if (itemResp.isFailed()) {
> LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
> failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
> }
> }
> hasFailure.set(true);
> }
> }
>
> public void afterBulk(long executionId,
>   BulkRequest request,
>   Throwable failure) {
> LOG.error(failure.getMessage());
> failureThrowable.compareAndSet(null, failure);
> hasFailure.set(true);
> }
> });
>
> // This makes flush() blocking
> bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
> if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
> }
>
> if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
> bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
> CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
> }
>
> if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
> }
>
> bulkProcessor = bulkProcessorBuilder.build();
> }
>
>
> @Override
> public void invoke(T element) {
> IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
> if (LOG.isDebugEnabled()) {
> LOG.debug("Emitting IndexRequest: {}", indexRequest);
> }
>
> bulkProcessor.add(indexRequest);
> }
>
> @Override
> public void close() {
> if (bulkProcessor != null) {
> bulkProcessor.close();
> bulkProcessor = null;
> }
>
> if (client != null) {
> client.close();
> }
>
> if (hasFailure.get()) {
> Throwable cause = failureThrowable.get();
> if (cause != null) {
> throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
> } else {
> throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
> }
> }
> }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder) (element,
> runtimeContext) -> {
> String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
> String measureAndTags = line[0];
> String[] kvSplit = line

  1   2   3   >