Note #2 on the list
http://www.datanami.com/2015/06/12/8-new-big-data-projects-to-watch/
Re: Flink Runtime Exception
I think Andra wrote that there is *no deadlock*. On Fri, Jun 19, 2015 at 3:18 PM Fabian Hueske fhue...@gmail.com http://mailto:fhue...@gmail.com wrote: Hi Andra, The system should never deadlock. There is a bug somewhere if that happens. Can you check if the program is really stuck? Cheers, Fabian 2015-06-19 15:08 GMT+02:00 Till Rohrmann trohrm...@apache.org: What does forever mean? Usually it's the case that you see a steep decline in performance once the system starts spilling data to disk because of the disk I/O bottleneck. The system always starts spilling to disk when it has no more memory left for its operations. So for example if you want to sort data which cannot be kept completely in memory, then the system has to employ external sorting. If you can give Flink more memory, then you can avoid this problem depending on the actual data size. You can observe disk I/O by using the `iotop` command for example. If you see Flink having a high I/O usage, then this might be an indicator for spilling. Cheers, Till On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu lungu.an...@gmail.com wrote: Another problem that I encountered during the same set of experiments(sorry if I am asking too many questions, I am eager to get things fixed): - for the same configuration, a piece of code runs perfectly on 10GB of input, then for 38GB it runs forever (no deadlock). I believe that may occur because Flink spills information to disk every time it runs out of memory... Is this fixable by increasing the number of buffers? That's the last question for today, promise :) Thanks! On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann trohrm...@apache.org wrote: Yes, it was an issue for the milestone release. On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu lungu.an...@gmail.com wrote: Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this version? I'll just fetch the latest master if this is the case. On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Andra, the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu lungu.an...@gmail.com wrote: Hi everyone, I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) - Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:134) at
Re: Flink Runtime Exception
woops, sorry! Whenever I read the word deadlock I getting a bit nervous and distracted ;-) 2015-06-19 15:21 GMT+02:00 Till Rohrmann trohrm...@apache.org: I think Andra wrote that there is *no deadlock*. On Fri, Jun 19, 2015 at 3:18 PM Fabian Hueske fhue...@gmail.com http://mailto:fhue...@gmail.com wrote: Hi Andra, The system should never deadlock. There is a bug somewhere if that happens. Can you check if the program is really stuck? Cheers, Fabian 2015-06-19 15:08 GMT+02:00 Till Rohrmann trohrm...@apache.org: What does forever mean? Usually it's the case that you see a steep decline in performance once the system starts spilling data to disk because of the disk I/O bottleneck. The system always starts spilling to disk when it has no more memory left for its operations. So for example if you want to sort data which cannot be kept completely in memory, then the system has to employ external sorting. If you can give Flink more memory, then you can avoid this problem depending on the actual data size. You can observe disk I/O by using the `iotop` command for example. If you see Flink having a high I/O usage, then this might be an indicator for spilling. Cheers, Till On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu lungu.an...@gmail.com wrote: Another problem that I encountered during the same set of experiments(sorry if I am asking too many questions, I am eager to get things fixed): - for the same configuration, a piece of code runs perfectly on 10GB of input, then for 38GB it runs forever (no deadlock). I believe that may occur because Flink spills information to disk every time it runs out of memory... Is this fixable by increasing the number of buffers? That's the last question for today, promise :) Thanks! On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann trohrm...@apache.org wrote: Yes, it was an issue for the milestone release. On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu lungu.an...@gmail.com wrote: Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this version? I'll just fetch the latest master if this is the case. On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Andra, the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu lungu.an...@gmail.com wrote: Hi everyone, I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) - Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at
Re: Flink Runtime Exception
On 19 Jun 2015, at 14:53, Andra Lungu lungu.an...@gmail.com wrote: Another problem that I encountered during the same set of experiments(sorry if I am asking too many questions, I am eager to get things fixed): - for the same configuration, a piece of code runs perfectly on 10GB of input, then for 38GB it runs forever (no deadlock). I believe that may occur because Flink spills information to disk every time it runs out of memory... Is this fixable by increasing the number of buffers? If you are referring to the number of network buffers configuration key, that should be unrelated. If this really is the issue, you can increase the heap size for the task managers. Have you confirmed your suspicion as Till suggested via iotop? :) – Ufuk
Re: Flink Runtime Exception
Hi Andra, the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu lungu.an...@gmail.com wrote: Hi everyone, I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) - Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874]] after [10 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:722) At first I thought, okay maybe wally004 is down; then I ssh'd into it. Works fine. The full output can be found here: https://gist.github.com/andralungu/d222b75cb33aea57955d Does anyone have any idea about what may have triggered this? :( Thanks! Andra
[jira] [Created] (FLINK-2245) Programs that contain collect() reported as multiple jobs in the Web frontend
Theodore Vasiloudis created FLINK-2245: -- Summary: Programs that contain collect() reported as multiple jobs in the Web frontend Key: FLINK-2245 URL: https://issues.apache.org/jira/browse/FLINK-2245 Project: Flink Issue Type: Bug Components: Webfrontend Reporter: Theodore Vasiloudis Priority: Minor Currently, if we submit a program (job) that contains calls to collect, we get a new job reported in the web fronted for each call to collect. The expected behaviour when submitting a job is for al the steps in the program to be grouped and reported under one job, regardless of the actions inside the job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2248) Allow disabling of sdtout logging output
Theodore Vasiloudis created FLINK-2248: -- Summary: Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2249) ExecutionEnvironment: Ignore calls to execute() if no data sinks defined
Maximilian Michels created FLINK-2249: - Summary: ExecutionEnvironment: Ignore calls to execute() if no data sinks defined Key: FLINK-2249 URL: https://issues.apache.org/jira/browse/FLINK-2249 Project: Flink Issue Type: Improvement Components: Java API, Scala API Affects Versions: 0.9 Reporter: Maximilian Michels The basic skeleton of a Flink program looks like this: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // bootstrap DataSet DataSet.. ds = env.fromElements(1,2,3,4); // perform transformations .. // define sinks, e.g. ds.writeToTextFile(/some/path); // execute env.execute() {code} First thing users do is to change {{ds.writeToTextFile(/some/path);}} into {{ds.print();}}. But that fails with an Exception (No new data sinks defined...). In FLINK-2026 we made this exception message easier to understand. However, users still don't understand what is happening. Especially because they see Flink executing and then failing. I propose to ignore calls to execute() when no sinks are defined. Instead, we should just print a warning: Detected call to execute without any data sinks. Not executing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2250) Backtracking of intermediate results
Maximilian Michels created FLINK-2250: - Summary: Backtracking of intermediate results Key: FLINK-2250 URL: https://issues.apache.org/jira/browse/FLINK-2250 Project: Flink Issue Type: New Feature Components: Distributed Runtime, Java API, Scala API Reporter: Maximilian Michels Assignee: Maximilian Michels With intermediate results available in the distributed runtime as of FLINK-986, we could now incrementally resume failed jobs if we cached the results FLINK-1404. Moreover, Flink users could build incremental Flink jobs using count/collect/print and, ultimately, also continue from old job results in an interactive shell environment like the scala-shell. The following tasks need to be completed for that to happen: - Cache the results - Keep the ExecutionGraph in the JobManager - Change the scheduling mechanism to track back the results from the sinks - Implement a session management to eventually discard old results and ExecutionGraphs from the TaskManagers/JobManager -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2244) Add ability to start and stop persistent IaaS cluster
Theodore Vasiloudis created FLINK-2244: -- Summary: Add ability to start and stop persistent IaaS cluster Key: FLINK-2244 URL: https://issues.apache.org/jira/browse/FLINK-2244 Project: Flink Issue Type: New Feature Reporter: Theodore Vasiloudis Priority: Minor Being able to launch a cluster on GCE/EC2, run some jobs, stop it and restart it later, while having persistent storage attached is very useful for people/companies that need to run jobs only occasionally. Currently we can launch a cluster, but it's not possible to stop the instances when we don't need them, and then restart them to pick up where we left off. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Reduce combiner not chained
Hey all, on the current master running the WordCount example with a text file input/output results and a manual reduce function (instead of the sum(1)) results in a combiner, which is not chained. The corresponding issue is here: https://issues.apache.org/jira/browse/FLINK-2246 Can someone please confirm this? If it is an issue, we should fix it soon. The serialization overhead is noticeable on larger inputs. – Ufuk
Re: Flink Runtime Exception
Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this version? I'll just fetch the latest master if this is the case. On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Andra, the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu lungu.an...@gmail.com wrote: Hi everyone, I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) - Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874]] after [10 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:722) At first I thought, okay maybe wally004 is down; then I ssh'd into it. Works fine. The full output can be found here: https://gist.github.com/andralungu/d222b75cb33aea57955d Does anyone have any idea about what may have triggered this? :( Thanks! Andra
Re: Failing Builds on Travis
+1 On 06/19/2015 10:35 AM, Ufuk Celebi wrote: On 19 Jun 2015, at 10:29, Maximilian Michels m...@apache.org wrote: Bringing this up again because of a recently discovered issue: https://issues.apache.org/jira/browse/FLINK-2235 Flink 0.9.0 will still support Java 6. Just wanted to clarify again that we drop support for Java 6 for any further major releases. Do we have a consensus? +1 signature.asc Description: OpenPGP digital signature
Re: Reduce combiner not chained
This is not a bug. Chained combiners are not supported for ReduceFunctions yet. :-( I updated the JIRA accordingly. 2015-06-19 13:04 GMT+02:00 Ufuk Celebi u...@apache.org: Hey all, on the current master running the WordCount example with a text file input/output results and a manual reduce function (instead of the sum(1)) results in a combiner, which is not chained. The corresponding issue is here: https://issues.apache.org/jira/browse/FLINK-2246 Can someone please confirm this? If it is an issue, we should fix it soon. The serialization overhead is noticeable on larger inputs. – Ufuk
execute() and collect()/print()/count()
Dear Flink community, I have stopped to count how many people on the user list and during Flink trainings have asked why their Flink program throws an Exception when they just one to print a DataSet. The reason for this is that print() now executes eagerly, thus, executes the Flink program. Subsequent calls to execute() need to define new DataSinks and throw an exception otherwise. We have recently introduced a flag in the ExecutionEnvironment that checks whether the user executed before (explicitly via execute() or implicitly through collect()/print()/count()). That enabled us to print a nicer exception message. However, users either do not read the exception message or do not understand it. They do ask this question a lot. That's why I propose to ignore calls to execute() entirely if no sinks are defined. That will get rid of one of the core annoyances for Flink users. I know, that this is painfully for us programmers because we understand how Flink works internally but let's step back once and see that it wouldn't be so bad if execute didn't do anything in case of no new sinks. What would be the downside of this change? Users might call execute() and wonder that nothing happens. We would then simply print a warning that their program didn't define any sinks. That is a big difference to the behavior before because users are scared of exceptions. If they just get a warning they will double-check their program and investigate why nothing happens. Most of the cases they do actually have defined sinks but simply left a call to execute() when they were printing a DataSet. What are you opinions on this issue? I have opened a JIRA for this as well: https://issues.apache.org/jira/browse/FLINK-2249 Best, Max
Re: Flink Runtime Exception
Yes, it was an issue for the milestone release. On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu lungu.an...@gmail.com wrote: Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this version? I'll just fetch the latest master if this is the case. On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Andra, the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu lungu.an...@gmail.com wrote: Hi everyone, I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) - Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874 ]] after [10 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:722) At first I thought, okay maybe wally004 is down; then I ssh'd into it. Works fine. The full output can be found here: https://gist.github.com/andralungu/d222b75cb33aea57955d Does anyone have any idea about what may have triggered this? :( Thanks! Andra
[jira] [Created] (FLINK-2251) Implement job session management
Maximilian Michels created FLINK-2251: - Summary: Implement job session management Key: FLINK-2251 URL: https://issues.apache.org/jira/browse/FLINK-2251 Project: Flink Issue Type: Sub-task Reporter: Maximilian Michels Assignee: Maximilian Michels This implements a session management to determine how long ExecutionGraphs should be cached in the JobManager before they are archived. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: execute() and collect()/print()/count()
+1 for ignoring execute() call with warning. But I'm concerned for how the user catches the error in program without any data sinks. By the way, eager execution is not well documented in data sinks section but is in program skeleton section. [1] This makes the user’s confusion. We should clean up documents. There are many codes calling execute() method after print() method. [2][3] We should add a description for count() method to documents too. [1] http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#data-sinks [2] http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#parallel-execution [3] http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#iteration-operators Regards, Chiwan Park On Jun 19, 2015, at 9:15 PM, Maximilian Michels m...@apache.org wrote: Dear Flink community, I have stopped to count how many people on the user list and during Flink trainings have asked why their Flink program throws an Exception when they just one to print a DataSet. The reason for this is that print() now executes eagerly, thus, executes the Flink program. Subsequent calls to execute() need to define new DataSinks and throw an exception otherwise. We have recently introduced a flag in the ExecutionEnvironment that checks whether the user executed before (explicitly via execute() or implicitly through collect()/print()/count()). That enabled us to print a nicer exception message. However, users either do not read the exception message or do not understand it. They do ask this question a lot. That's why I propose to ignore calls to execute() entirely if no sinks are defined. That will get rid of one of the core annoyances for Flink users. I know, that this is painfully for us programmers because we understand how Flink works internally but let's step back once and see that it wouldn't be so bad if execute didn't do anything in case of no new sinks. What would be the downside of this change? Users might call execute() and wonder that nothing happens. We would then simply print a warning that their program didn't define any sinks. That is a big difference to the behavior before because users are scared of exceptions. If they just get a warning they will double-check their program and investigate why nothing happens. Most of the cases they do actually have defined sinks but simply left a call to execute() when they were printing a DataSet. What are you opinions on this issue? I have opened a JIRA for this as well: https://issues.apache.org/jira/browse/FLINK-2249 Best, Max
[jira] [Created] (FLINK-2247) Improve the way memory is reported in the web frontend
Theodore Vasiloudis created FLINK-2247: -- Summary: Improve the way memory is reported in the web frontend Key: FLINK-2247 URL: https://issues.apache.org/jira/browse/FLINK-2247 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: Theodore Vasiloudis Priority: Trivial Currently in the taskmanager view of the web frontend, the memory available to Flink is reported in a slightly confusing manner. In the worker summary, we get a report of the Flink Managed memory available, and we get warnings when that is set too low. The warnings though seem to be not taking the memory available to Flink when being issued. For example, in a machine with 7.5GB memory available it is normal to assign ~6GB for the JVM, which under default settings gives ~4GB to Flink managed memory. In this case however, we get a warning that 7500MB of memory is available, but only ~$4000MB is available to Flink, disregarding the total amount available to the JVM. The reporting can then be improved by reporting the total amount available for the JVM, the amount available for Flink's managed memory, and only issue warnings when the settings are actually low compared to the available memory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Failing Builds on Travis
On 19 Jun 2015, at 10:29, Maximilian Michels m...@apache.org wrote: Bringing this up again because of a recently discovered issue: https://issues.apache.org/jira/browse/FLINK-2235 Flink 0.9.0 will still support Java 6. Just wanted to clarify again that we drop support for Java 6 for any further major releases. Do we have a consensus? +1
Re: Quickstart POMs
On 19 Jun 2015, at 10:19, Maximilian Michels m...@apache.org wrote: @Robert @Stephan Thanks for clarifying! Of course it would be better to have a concise quickstart pom.xml but the necessary workarounds justify the current state. +1
Re: Quickstart POMs
@Robert @Stephan Thanks for clarifying! Of course it would be better to have a concise quickstart pom.xml but the necessary workarounds justify the current state. On Thu, Jun 18, 2015 at 11:48 PM, Stephan Ewen se...@apache.org wrote: I remember we had a similar discussion a while back. The current solution is the best out-of-the-box experience for the quickstarts across Eclipse, IntelliJ, and command-line Maven. This is more important that nice and compact POM files, IMHO. On Thu, Jun 18, 2015 at 11:26 AM, Robert Metzger rmetz...@apache.org wrote: Hi, I'm against cancelling a release for something that is not nice ;) It has to be at least broken to cancel :) I agree that the pom looks complicated and I would love to do it better, but in my opinion the current approach is giving our users the best out of the box experience. The right approach of creating a Flink fat jar would be using the maven-shade-plugin with the Flink dependencies set to provided. This way we tell the shade plugin that it can assume the core flink code to be available. So there is no need to package those classes into the fat-jar. The problem is that IntelliJ is not adding provided classes into the classpath when importing the pom. So IntelliJ users will not be able to run Flink jobs out of the IDE. That's why the Flink dependencies are in the default scope. The exclusions are in the maven-shade-plugin to reduce the jar size as much as possible. But there is also a maven profile to set the Flink dependencies to provided, making the resulting jar as small as possible. By the way, it is not possible to just exclude everything from org.apache.flink, because a) users sometimes put their code into that package, so we would exclude the code b) Libraries like Gelly of Flink ML are also in the org.apache.flink namespace but not provided on the server. There is an ongoing discussion in IntelliJ's issue tracker whether to change the behavior: https://youtrack.jetbrains.com/issue/IDEA-107048 Best, Robert On Thu, Jun 18, 2015 at 6:46 PM, Chiwan Park chiwanp...@icloud.com wrote: Is it okay when the user runs the built jar in LocalEnvironment? (Just run with `java -jar` command.) I know that it is special case but it is a possible scenario for local testing. If we change Quickstart POM to use provided scope for dependencies, we should add a guide about this into document. Regards, Chiwan Park On Jun 19, 2015, at 12:53 AM, Aljoscha Krettek aljos...@apache.org wrote: I'm also for simplification but let's hear what those who put the build-jar profile there have to say about it.? On Thu, 18 Jun 2015 at 17:25 Ufuk Celebi u...@apache.org wrote: On 18 Jun 2015, at 16:58, Fabian Hueske fhue...@gmail.com wrote: Why? mvn package builds the program correctly, no? Yes, but: - Dependencies not specified by the user may be included (Metrics, javaassist) - Dependencies specified by the user may be excluded - If you use the build-jar profile you have to understand what the difference to the default profile is and then you have to include your dependencies again for the profile - The pom comments are confusing
Re: Flink Runtime Exception
Hi Andra, I would try increasing the memory per task manager, i.e. on a machine with 8 CPUs and 16GBs of memory, instead of spawning 8 TMs with 2GB each, I would try to spawn 2 TMs of 8GBs each. This might help with the spilling problem (in case that the CPU is not your bottleneck, this might even speed up the computations by avoiding spilling) and get you unstuck. Cheers, Asterios On Fri, Jun 19, 2015 at 4:16 PM, Ufuk Celebi u...@apache.org wrote: On 19 Jun 2015, at 14:53, Andra Lungu lungu.an...@gmail.com wrote: Another problem that I encountered during the same set of experiments(sorry if I am asking too many questions, I am eager to get things fixed): - for the same configuration, a piece of code runs perfectly on 10GB of input, then for 38GB it runs forever (no deadlock). I believe that may occur because Flink spills information to disk every time it runs out of memory... Is this fixable by increasing the number of buffers? If you are referring to the number of network buffers configuration key, that should be unrelated. If this really is the issue, you can increase the heap size for the task managers. Have you confirmed your suspicion as Till suggested via iotop? :) – Ufuk
[jira] [Created] (FLINK-2253) ALS fails if the data flow is split up and the input is non-deterministic
Till Rohrmann created FLINK-2253: Summary: ALS fails if the data flow is split up and the input is non-deterministic Key: FLINK-2253 URL: https://issues.apache.org/jira/browse/FLINK-2253 Project: Flink Issue Type: Bug Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann If the {{temporaryPath}} of Flink's {{ALS}} implementation is set, then the ALS job is executed one part after the other. This also includes the routing information calculation for the user and item blocks. If the input {{DataSet}} is calculated non-deterministically, e.g. if it's the result of a {{first}} operation, then the resulting routing information won't be consistent and ALS fails. A solution would be to pin the input {{DataSet}} so that it will only be executed once. Until we have this functionality, I propose to simply execute the user and item routing information at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] Release Apache Flink 0.9.0 (release-0.9.0-rc4)
Signature file look good Checksum files look good LICENSE file look good NOTICE file look good Code compile and pass tests No 3rd party exes in the source artifacts Run local mode for examples Start standalone mode Deploy in YARN for session and non-session +1 (binding) - Henry On Thu, Jun 18, 2015 at 4:46 AM, Maximilian Michels m...@apache.org wrote: Please vote on releasing the following candidate as Apache Flink version 0.9.0. - The commit to be voted on: 0a35271dba4675311eb3094d8669a876efd285a9 Branch: release-0.9.0-rc4 ( https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git;a=shortlog;h=refs/heads/release-0.9.0-rc4 ) The release artifacts to be voted on can be found at: http://people.apache.org/~mxm/flink-0.9.0-rc4/ Release artifacts are signed with the key with fingerprint C2909CBF: http://www.apache.org/dist/flink/KEYS The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapacheflink-1042 - Please vote on releasing this package as Apache Flink 0.9.0. The vote is open for the next 72 hours and passes if a majority of at least three +1 PMC votes are cast. The vote ends on Tuesday (June 23, 2015). [ ] +1 Release this package as Apache Flink 0.9.0 [ ] -1 Do not release this package because ... The following commits have been added since the last release candidate (release-0.9.0-rc3): 61f17f8 [core] Fix typo in DataSinkTask error message d47405b [streaming] Fix out-of-sync class-level docs of SourceFunction bbd08f3 Revert [FLINK-2203] handling null values for RowSerializer d6ad294 Revert [FLINK-2210] Table API support for aggregation on columns with null values