RE: Gather a distributed dataset

2015-01-27 Thread Kruse, Sebastian
Hi everyone,

I just wanted to give you the pointer FLINK-1038 
https://github.com/apache/flink/pull/94
This is an output format that can send DataSet contents via Java RMI to, e.g., 
the driver. I am currently using it a lot and it seems to scale pretty well.

Cheers,
Sebastian

-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org] 
Sent: Montag, 12. Januar 2015 12:06
To: dev@flink.apache.org
Subject: Re: Gather a distributed dataset

Hey Alexander,

On 12 Jan 2015, at 11:42, Alexander Alexandrov 
alexander.s.alexand...@gmail.com wrote:

 Hi there,
 
 I wished for intermediate datasets, and Santa Ufuk made my wishes come 
 true (thank you, Santa)!
 
 Now that FLINK-986 is in the mainline, I want to ask some practical 
 questions.
 
 In Spark, there is a way to put a value from the local driver to the 
 distributed runtime via
 
 val x = env.parallelize(...) // expose x to the distributed runtime 
 val y = dataflow(env, x) // y is produced by a dataflow which reads 
 from x
 
 and also to get a value from the distributed runtime back to the 
 driver
 
 val z = env.collect(y)
 
 As far as I know, in Flink we have an equivalent for parallelize
 
 val x = env.fromCollection(...)
 
 but not for collect. Is this still the case?

Yes, but this will change soon.

 If yes, how hard would it be to add this feature at the moment? Can 
 you give me some pointers?

There is a alpha version/hack of this using accumulators. See 
https://github.com/apache/flink/pull/210. The problem is that each collect call 
results in a new program being executed from the sources. I think Stephan is 
working on the scheduling to support this kind of programs. From the runtime 
perspective, it is not a problem to transfer the produced intermediate results 
back to the job manager. The job manager can basically use the same mechanism 
that the task managers use. Even the accumulator version should be fine as a 
initial version.


Re: Gather a distributed dataset

2015-01-16 Thread Alexander Alexandrov
Thanks, I will have a look at your comments tomorrow and create a PR which
should superseed 210. BTW, is there already a test case where I can see the
suggested way to do staged execution in with the new ExecutionEnvironment
API?

I thought about your second remark as well. The following lines pitch
summarize some of these thoughts and can (or cannot) be used for future
improvements in the API and the runtime design.

Ideally, a runtime for parallel collection processing should support at
least two types of values:

- collections of type DataSet[T]; these are consumed and produced by the
parallel dataflows;
- simple types of any type either T; these are used to represent
broadcast variables as well as results for global aggregates;

By simple types I don't mean that T should be a scalar -- it could also
be a complex type like a POJO or Typle; it rather entails that the system
does not understand and cannot make use of its internal type structure;

At the moment, the runtime only supports values of type DataSet[T]. This is
inconvenient because

- you have to wrap simple typed values into a DataSet[T] in order to expose
them to your UDFs as broadcast variables; this is not visible to the user
but makes for some confusing code in the internals;
- global aggregates produce a value of type DataSet[T] rather than T; This
is inconsistent with the result type of the fold operator (and its
variants) which can be seen in other programming languages;

I think that the ideas in your email go in that direction. I suggest to
have the following hierarchy of types:

- Value[T] - an abstract base for all values
- Singleton[T] extends Value[T] - a container for exactly one value of type
T
- DataSet[T] extends Value[T] - a container for a (parallelizable)
homogeneous collection of values of type T

We should then rethink which of the runtime operators can consume or
produce both value types and which can only consume/produce a Singleton[T]
or a DataSet[T] adapt their signatures accordingly.

2015-01-16 15:24 GMT+01:00 Stephan Ewen se...@apache.org:

 @Alex That sounds great. I added a few inline comments to PR 210 and then
 it is good to merge. If you want, feel free to fix it up and we will merge
 it.

 Feel free to also add (or suggest and stub) more of such functions. Is that
 what you meant by designing interfaces ?

 Here is a thought that crossed my mind:
  - Should functions like reduce() and aggregate() (in their ungrouped
 version) produce a SingleValuedDataSet (or ScalarDataSet) that is known
 to have only a single value? That data set could offer an additional method
 get() that directly grabs that value (rather then collect() getting a
 list).

 Stephan




 On Thu, Jan 15, 2015 at 11:30 AM, Ufuk Celebi u...@apache.org wrote:

 
  On 13 Jan 2015, at 16:50, Stephan Ewen se...@apache.org wrote:
 
   Hi!
  
   To follow up on what Ufuk explaned:
  
   - Ufuk is right, the problem is not getting the data set.
   https://github.com/apache/flink/pull/210 does that for anything that
 is
  not
   too gigantic, which is a good start. I think we should merge this as
 soon
   as we agree on the signature and names of the API methods. We can swap
  the
   internal realization for something more robust later.
  
   - For anything that just issues a program and wants the result back,
 this
   is actually perfectly fine.
  
   - For true interactive programs, we need to back track to intermediate
   results (rather than to the source) to avoid re-executing large parts.
  This
   is the biggest missing piece, next to the persistent materialization of
   intermediate results (Ufuk is working on this). The logic is the same
 as
   for fault tolerance, so it is part of that development.
  
   @alexander: I want to create the feature branch for that on Thursday.
 Are
   you interested in contributing to that feature?
  
   - For streaming results continuously back, we need another mechanism
 than
   the accumulators. Let's create a design doc or thread an get working on
   that. Probably involves adding another set of akka messages from TM -
 JM
   - Client. Or something like an extension to the BLOB manager for
  streams?
 
  For streaming results back, we can use the same mechanisms used by the
  task managers. Let me add documentation (FLINK-1373) for the network
 stack
  this week.



Re: Gather a distributed dataset

2015-01-15 Thread Ufuk Celebi

On 13 Jan 2015, at 16:50, Stephan Ewen se...@apache.org wrote:

 Hi!
 
 To follow up on what Ufuk explaned:
 
 - Ufuk is right, the problem is not getting the data set.
 https://github.com/apache/flink/pull/210 does that for anything that is not
 too gigantic, which is a good start. I think we should merge this as soon
 as we agree on the signature and names of the API methods. We can swap the
 internal realization for something more robust later.
 
 - For anything that just issues a program and wants the result back, this
 is actually perfectly fine.
 
 - For true interactive programs, we need to back track to intermediate
 results (rather than to the source) to avoid re-executing large parts. This
 is the biggest missing piece, next to the persistent materialization of
 intermediate results (Ufuk is working on this). The logic is the same as
 for fault tolerance, so it is part of that development.
 
 @alexander: I want to create the feature branch for that on Thursday. Are
 you interested in contributing to that feature?
 
 - For streaming results continuously back, we need another mechanism than
 the accumulators. Let's create a design doc or thread an get working on
 that. Probably involves adding another set of akka messages from TM - JM
 - Client. Or something like an extension to the BLOB manager for streams?

For streaming results back, we can use the same mechanisms used by the task 
managers. Let me add documentation (FLINK-1373) for the network stack this week.

Re: Gather a distributed dataset

2015-01-15 Thread Alexander Alexandrov
@Stephan: yes, I would like to contribute (e.g. I can design the interfaces
and merge 210).

Please reply with more information once you have the branch, I can find
some time for that next week (on the expense of FLINK-1347
https://issues.apache.org/jira/browse/FLINK-1347 which hopefully can wait
3-4 more weeks).

Regards,
Alex


2015-01-13 16:50 GMT+01:00 Stephan Ewen se...@apache.org:

 Hi!

 To follow up on what Ufuk explaned:

  - Ufuk is right, the problem is not getting the data set.
 https://github.com/apache/flink/pull/210 does that for anything that is
 not
 too gigantic, which is a good start. I think we should merge this as soon
 as we agree on the signature and names of the API methods. We can swap the
 internal realization for something more robust later.

  - For anything that just issues a program and wants the result back, this
 is actually perfectly fine.

  - For true interactive programs, we need to back track to intermediate
 results (rather than to the source) to avoid re-executing large parts. This
 is the biggest missing piece, next to the persistent materialization of
 intermediate results (Ufuk is working on this). The logic is the same as
 for fault tolerance, so it is part of that development.

 @alexander: I want to create the feature branch for that on Thursday. Are
 you interested in contributing to that feature?

  - For streaming results continuously back, we need another mechanism than
 the accumulators. Let's create a design doc or thread an get working on
 that. Probably involves adding another set of akka messages from TM - JM
 - Client. Or something like an extension to the BLOB manager for streams?

 Greetings,
 Stephan


 On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov 
 alexander.s.alexand...@gmail.com wrote:

  Thanks, I am currently looking at the new ExecutionEnvironment API.
 
   I think Stephan is working on the scheduling to support this kind of
  programs.
 
  @Stephan: is there a feature branch for that somewhere?
 
  2015-01-12 12:05 GMT+01:00 Ufuk Celebi u...@apache.org:
 
   Hey Alexander,
  
   On 12 Jan 2015, at 11:42, Alexander Alexandrov 
   alexander.s.alexand...@gmail.com wrote:
  
Hi there,
   
I wished for intermediate datasets, and Santa Ufuk made my wishes
 come
   true
(thank you, Santa)!
   
Now that FLINK-986 is in the mainline, I want to ask some practical
questions.
   
In Spark, there is a way to put a value from the local driver to the
distributed runtime via
   
val x = env.parallelize(...) // expose x to the distributed runtime
val y = dataflow(env, x) // y is produced by a dataflow which reads
  from
   x
   
and also to get a value from the distributed runtime back to the
 driver
   
val z = env.collect(y)
   
As far as I know, in Flink we have an equivalent for parallelize
   
val x = env.fromCollection(...)
   
but not for collect. Is this still the case?
  
   Yes, but this will change soon.
  
If yes, how hard would it be to add this feature at the moment? Can
 you
give me some pointers?
  
   There is a alpha version/hack of this using accumulators. See
   https://github.com/apache/flink/pull/210. The problem is that each
   collect call results in a new program being executed from the sources.
 I
   think Stephan is working on the scheduling to support this kind of
   programs. From the runtime perspective, it is not a problem to transfer
  the
   produced intermediate results back to the job manager. The job manager
  can
   basically use the same mechanism that the task managers use. Even the
   accumulator version should be fine as a initial version.
 



Re: Gather a distributed dataset

2015-01-13 Thread Stephan Ewen
Hi!

To follow up on what Ufuk explaned:

 - Ufuk is right, the problem is not getting the data set.
https://github.com/apache/flink/pull/210 does that for anything that is not
too gigantic, which is a good start. I think we should merge this as soon
as we agree on the signature and names of the API methods. We can swap the
internal realization for something more robust later.

 - For anything that just issues a program and wants the result back, this
is actually perfectly fine.

 - For true interactive programs, we need to back track to intermediate
results (rather than to the source) to avoid re-executing large parts. This
is the biggest missing piece, next to the persistent materialization of
intermediate results (Ufuk is working on this). The logic is the same as
for fault tolerance, so it is part of that development.

@alexander: I want to create the feature branch for that on Thursday. Are
you interested in contributing to that feature?

 - For streaming results continuously back, we need another mechanism than
the accumulators. Let's create a design doc or thread an get working on
that. Probably involves adding another set of akka messages from TM - JM
- Client. Or something like an extension to the BLOB manager for streams?

Greetings,
Stephan


On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov 
alexander.s.alexand...@gmail.com wrote:

 Thanks, I am currently looking at the new ExecutionEnvironment API.

  I think Stephan is working on the scheduling to support this kind of
 programs.

 @Stephan: is there a feature branch for that somewhere?

 2015-01-12 12:05 GMT+01:00 Ufuk Celebi u...@apache.org:

  Hey Alexander,
 
  On 12 Jan 2015, at 11:42, Alexander Alexandrov 
  alexander.s.alexand...@gmail.com wrote:
 
   Hi there,
  
   I wished for intermediate datasets, and Santa Ufuk made my wishes come
  true
   (thank you, Santa)!
  
   Now that FLINK-986 is in the mainline, I want to ask some practical
   questions.
  
   In Spark, there is a way to put a value from the local driver to the
   distributed runtime via
  
   val x = env.parallelize(...) // expose x to the distributed runtime
   val y = dataflow(env, x) // y is produced by a dataflow which reads
 from
  x
  
   and also to get a value from the distributed runtime back to the driver
  
   val z = env.collect(y)
  
   As far as I know, in Flink we have an equivalent for parallelize
  
   val x = env.fromCollection(...)
  
   but not for collect. Is this still the case?
 
  Yes, but this will change soon.
 
   If yes, how hard would it be to add this feature at the moment? Can you
   give me some pointers?
 
  There is a alpha version/hack of this using accumulators. See
  https://github.com/apache/flink/pull/210. The problem is that each
  collect call results in a new program being executed from the sources. I
  think Stephan is working on the scheduling to support this kind of
  programs. From the runtime perspective, it is not a problem to transfer
 the
  produced intermediate results back to the job manager. The job manager
 can
  basically use the same mechanism that the task managers use. Even the
  accumulator version should be fine as a initial version.



Re: Gather a distributed dataset

2015-01-12 Thread Ufuk Celebi
Hey Alexander,

On 12 Jan 2015, at 11:42, Alexander Alexandrov 
alexander.s.alexand...@gmail.com wrote:

 Hi there,
 
 I wished for intermediate datasets, and Santa Ufuk made my wishes come true
 (thank you, Santa)!
 
 Now that FLINK-986 is in the mainline, I want to ask some practical
 questions.
 
 In Spark, there is a way to put a value from the local driver to the
 distributed runtime via
 
 val x = env.parallelize(...) // expose x to the distributed runtime
 val y = dataflow(env, x) // y is produced by a dataflow which reads from x
 
 and also to get a value from the distributed runtime back to the driver
 
 val z = env.collect(y)
 
 As far as I know, in Flink we have an equivalent for parallelize
 
 val x = env.fromCollection(...)
 
 but not for collect. Is this still the case?

Yes, but this will change soon.

 If yes, how hard would it be to add this feature at the moment? Can you
 give me some pointers?

There is a alpha version/hack of this using accumulators. See 
https://github.com/apache/flink/pull/210. The problem is that each collect call 
results in a new program being executed from the sources. I think Stephan is 
working on the scheduling to support this kind of programs. From the runtime 
perspective, it is not a problem to transfer the produced intermediate results 
back to the job manager. The job manager can basically use the same mechanism 
that the task managers use. Even the accumulator version should be fine as a 
initial version.