RE: Gather a distributed dataset
Hi everyone, I just wanted to give you the pointer FLINK-1038 https://github.com/apache/flink/pull/94 This is an output format that can send DataSet contents via Java RMI to, e.g., the driver. I am currently using it a lot and it seems to scale pretty well. Cheers, Sebastian -Original Message- From: Ufuk Celebi [mailto:u...@apache.org] Sent: Montag, 12. Januar 2015 12:06 To: dev@flink.apache.org Subject: Re: Gather a distributed dataset Hey Alexander, On 12 Jan 2015, at 11:42, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Hi there, I wished for intermediate datasets, and Santa Ufuk made my wishes come true (thank you, Santa)! Now that FLINK-986 is in the mainline, I want to ask some practical questions. In Spark, there is a way to put a value from the local driver to the distributed runtime via val x = env.parallelize(...) // expose x to the distributed runtime val y = dataflow(env, x) // y is produced by a dataflow which reads from x and also to get a value from the distributed runtime back to the driver val z = env.collect(y) As far as I know, in Flink we have an equivalent for parallelize val x = env.fromCollection(...) but not for collect. Is this still the case? Yes, but this will change soon. If yes, how hard would it be to add this feature at the moment? Can you give me some pointers? There is a alpha version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.
Re: Gather a distributed dataset
Thanks, I will have a look at your comments tomorrow and create a PR which should superseed 210. BTW, is there already a test case where I can see the suggested way to do staged execution in with the new ExecutionEnvironment API? I thought about your second remark as well. The following lines pitch summarize some of these thoughts and can (or cannot) be used for future improvements in the API and the runtime design. Ideally, a runtime for parallel collection processing should support at least two types of values: - collections of type DataSet[T]; these are consumed and produced by the parallel dataflows; - simple types of any type either T; these are used to represent broadcast variables as well as results for global aggregates; By simple types I don't mean that T should be a scalar -- it could also be a complex type like a POJO or Typle; it rather entails that the system does not understand and cannot make use of its internal type structure; At the moment, the runtime only supports values of type DataSet[T]. This is inconvenient because - you have to wrap simple typed values into a DataSet[T] in order to expose them to your UDFs as broadcast variables; this is not visible to the user but makes for some confusing code in the internals; - global aggregates produce a value of type DataSet[T] rather than T; This is inconsistent with the result type of the fold operator (and its variants) which can be seen in other programming languages; I think that the ideas in your email go in that direction. I suggest to have the following hierarchy of types: - Value[T] - an abstract base for all values - Singleton[T] extends Value[T] - a container for exactly one value of type T - DataSet[T] extends Value[T] - a container for a (parallelizable) homogeneous collection of values of type T We should then rethink which of the runtime operators can consume or produce both value types and which can only consume/produce a Singleton[T] or a DataSet[T] adapt their signatures accordingly. 2015-01-16 15:24 GMT+01:00 Stephan Ewen se...@apache.org: @Alex That sounds great. I added a few inline comments to PR 210 and then it is good to merge. If you want, feel free to fix it up and we will merge it. Feel free to also add (or suggest and stub) more of such functions. Is that what you meant by designing interfaces ? Here is a thought that crossed my mind: - Should functions like reduce() and aggregate() (in their ungrouped version) produce a SingleValuedDataSet (or ScalarDataSet) that is known to have only a single value? That data set could offer an additional method get() that directly grabs that value (rather then collect() getting a list). Stephan On Thu, Jan 15, 2015 at 11:30 AM, Ufuk Celebi u...@apache.org wrote: On 13 Jan 2015, at 16:50, Stephan Ewen se...@apache.org wrote: Hi! To follow up on what Ufuk explaned: - Ufuk is right, the problem is not getting the data set. https://github.com/apache/flink/pull/210 does that for anything that is not too gigantic, which is a good start. I think we should merge this as soon as we agree on the signature and names of the API methods. We can swap the internal realization for something more robust later. - For anything that just issues a program and wants the result back, this is actually perfectly fine. - For true interactive programs, we need to back track to intermediate results (rather than to the source) to avoid re-executing large parts. This is the biggest missing piece, next to the persistent materialization of intermediate results (Ufuk is working on this). The logic is the same as for fault tolerance, so it is part of that development. @alexander: I want to create the feature branch for that on Thursday. Are you interested in contributing to that feature? - For streaming results continuously back, we need another mechanism than the accumulators. Let's create a design doc or thread an get working on that. Probably involves adding another set of akka messages from TM - JM - Client. Or something like an extension to the BLOB manager for streams? For streaming results back, we can use the same mechanisms used by the task managers. Let me add documentation (FLINK-1373) for the network stack this week.
Re: Gather a distributed dataset
On 13 Jan 2015, at 16:50, Stephan Ewen se...@apache.org wrote: Hi! To follow up on what Ufuk explaned: - Ufuk is right, the problem is not getting the data set. https://github.com/apache/flink/pull/210 does that for anything that is not too gigantic, which is a good start. I think we should merge this as soon as we agree on the signature and names of the API methods. We can swap the internal realization for something more robust later. - For anything that just issues a program and wants the result back, this is actually perfectly fine. - For true interactive programs, we need to back track to intermediate results (rather than to the source) to avoid re-executing large parts. This is the biggest missing piece, next to the persistent materialization of intermediate results (Ufuk is working on this). The logic is the same as for fault tolerance, so it is part of that development. @alexander: I want to create the feature branch for that on Thursday. Are you interested in contributing to that feature? - For streaming results continuously back, we need another mechanism than the accumulators. Let's create a design doc or thread an get working on that. Probably involves adding another set of akka messages from TM - JM - Client. Or something like an extension to the BLOB manager for streams? For streaming results back, we can use the same mechanisms used by the task managers. Let me add documentation (FLINK-1373) for the network stack this week.
Re: Gather a distributed dataset
@Stephan: yes, I would like to contribute (e.g. I can design the interfaces and merge 210). Please reply with more information once you have the branch, I can find some time for that next week (on the expense of FLINK-1347 https://issues.apache.org/jira/browse/FLINK-1347 which hopefully can wait 3-4 more weeks). Regards, Alex 2015-01-13 16:50 GMT+01:00 Stephan Ewen se...@apache.org: Hi! To follow up on what Ufuk explaned: - Ufuk is right, the problem is not getting the data set. https://github.com/apache/flink/pull/210 does that for anything that is not too gigantic, which is a good start. I think we should merge this as soon as we agree on the signature and names of the API methods. We can swap the internal realization for something more robust later. - For anything that just issues a program and wants the result back, this is actually perfectly fine. - For true interactive programs, we need to back track to intermediate results (rather than to the source) to avoid re-executing large parts. This is the biggest missing piece, next to the persistent materialization of intermediate results (Ufuk is working on this). The logic is the same as for fault tolerance, so it is part of that development. @alexander: I want to create the feature branch for that on Thursday. Are you interested in contributing to that feature? - For streaming results continuously back, we need another mechanism than the accumulators. Let's create a design doc or thread an get working on that. Probably involves adding another set of akka messages from TM - JM - Client. Or something like an extension to the BLOB manager for streams? Greetings, Stephan On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Thanks, I am currently looking at the new ExecutionEnvironment API. I think Stephan is working on the scheduling to support this kind of programs. @Stephan: is there a feature branch for that somewhere? 2015-01-12 12:05 GMT+01:00 Ufuk Celebi u...@apache.org: Hey Alexander, On 12 Jan 2015, at 11:42, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Hi there, I wished for intermediate datasets, and Santa Ufuk made my wishes come true (thank you, Santa)! Now that FLINK-986 is in the mainline, I want to ask some practical questions. In Spark, there is a way to put a value from the local driver to the distributed runtime via val x = env.parallelize(...) // expose x to the distributed runtime val y = dataflow(env, x) // y is produced by a dataflow which reads from x and also to get a value from the distributed runtime back to the driver val z = env.collect(y) As far as I know, in Flink we have an equivalent for parallelize val x = env.fromCollection(...) but not for collect. Is this still the case? Yes, but this will change soon. If yes, how hard would it be to add this feature at the moment? Can you give me some pointers? There is a alpha version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.
Re: Gather a distributed dataset
Hi! To follow up on what Ufuk explaned: - Ufuk is right, the problem is not getting the data set. https://github.com/apache/flink/pull/210 does that for anything that is not too gigantic, which is a good start. I think we should merge this as soon as we agree on the signature and names of the API methods. We can swap the internal realization for something more robust later. - For anything that just issues a program and wants the result back, this is actually perfectly fine. - For true interactive programs, we need to back track to intermediate results (rather than to the source) to avoid re-executing large parts. This is the biggest missing piece, next to the persistent materialization of intermediate results (Ufuk is working on this). The logic is the same as for fault tolerance, so it is part of that development. @alexander: I want to create the feature branch for that on Thursday. Are you interested in contributing to that feature? - For streaming results continuously back, we need another mechanism than the accumulators. Let's create a design doc or thread an get working on that. Probably involves adding another set of akka messages from TM - JM - Client. Or something like an extension to the BLOB manager for streams? Greetings, Stephan On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Thanks, I am currently looking at the new ExecutionEnvironment API. I think Stephan is working on the scheduling to support this kind of programs. @Stephan: is there a feature branch for that somewhere? 2015-01-12 12:05 GMT+01:00 Ufuk Celebi u...@apache.org: Hey Alexander, On 12 Jan 2015, at 11:42, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Hi there, I wished for intermediate datasets, and Santa Ufuk made my wishes come true (thank you, Santa)! Now that FLINK-986 is in the mainline, I want to ask some practical questions. In Spark, there is a way to put a value from the local driver to the distributed runtime via val x = env.parallelize(...) // expose x to the distributed runtime val y = dataflow(env, x) // y is produced by a dataflow which reads from x and also to get a value from the distributed runtime back to the driver val z = env.collect(y) As far as I know, in Flink we have an equivalent for parallelize val x = env.fromCollection(...) but not for collect. Is this still the case? Yes, but this will change soon. If yes, how hard would it be to add this feature at the moment? Can you give me some pointers? There is a alpha version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.
Re: Gather a distributed dataset
Hey Alexander, On 12 Jan 2015, at 11:42, Alexander Alexandrov alexander.s.alexand...@gmail.com wrote: Hi there, I wished for intermediate datasets, and Santa Ufuk made my wishes come true (thank you, Santa)! Now that FLINK-986 is in the mainline, I want to ask some practical questions. In Spark, there is a way to put a value from the local driver to the distributed runtime via val x = env.parallelize(...) // expose x to the distributed runtime val y = dataflow(env, x) // y is produced by a dataflow which reads from x and also to get a value from the distributed runtime back to the driver val z = env.collect(y) As far as I know, in Flink we have an equivalent for parallelize val x = env.fromCollection(...) but not for collect. Is this still the case? Yes, but this will change soon. If yes, how hard would it be to add this feature at the moment? Can you give me some pointers? There is a alpha version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.