Re: difference between reducefunction and GroupReduceFunction

2015-05-22 Thread Maximilian Michels
Like you said, it depends on the use case. The GroupReduceFunction is a
generalization of the traditional reduce. Thus, it is more powerful.
However, it is also executed differently; a GroupReduceFunction requires
the whole group to be materialized and passed at once. If your program
doesn't require that, use the normal reduce function.

On Thu, May 21, 2015 at 4:42 PM, santosh_rajaguru sani...@gmail.com wrote:

 i am new to flink and map reduce. My query is
 Apart from incrementally combing 2 elements, what are the merits of using
 reduceFunction over GroupReduceFunction. which usecases suits what
 functions
 the most!!!






 --
 View this message in context:
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/difference-between-reducefunction-and-GroupReduceFunction-tp5768.html
 Sent from the Apache Flink Mailing List archive. mailing list archive at
 Nabble.com.



question please

2015-05-22 Thread Eng Fawzya
hi,
i want to know what is the difference between FLink and Hadoop?

-- 
Fawzya Ramadan Sayed,
Teaching Assistant,
Computer Science Department,
Faculty of Computers and Information,
Fayoum University


Re: Package multiple jobs in a single jar

2015-05-22 Thread Maximilian Michels
Hi Matthias,

Thank you for taking the time to analyze Flink's invocation behavior. I
like your proposal. I'm not sure whether it is a good idea to scan the
entire JAR for main methods. Sometimes, main methods are added solely for
testing purposes and don't really serve any practical use. However, if
you're already going through the JAR to find the ProgramDescription
interface, then you might look for main methods as well. As long as it is
just a listing without execution, that should be fine.

Best regards,
Max

On Thu, May 21, 2015 at 3:43 PM, Matthias J. Sax 
mj...@informatik.hu-berlin.de wrote:

 Hi,

 I had a look into the current Workflow of Flink with regard to the
 progressing steps of a jar file.

 If I got it right it works as follows (not sure if this is documented
 somewhere):

 1) check, if -c flag is used to set program entry point
if yes, goto 4
 2) try to extract program-class property from manifest
(if found goto 4)
 3) try to extract Main-Class property from manifest
- if not found through exception (this happens also, if no manifest
 file is found at all)

 4) check if entry point class implements Program interface
if yes, goto 6
 5) check if entry point class provided public static void main(String[]
 args) method
- if not, through exception

 6) execute program (ie, show plan/info or really run it)


 I also discovered the interface ProgramDescription with a single
 method String getDescription(). Even if some examples implement this
 interface (and use it in the example itself), Flink basically ignores
 it... From the CLI there is no way to get this info, and the WebUI does
 actually get it if present, however, doesn't show it anywhere...


 I think it would be nice, if we would extend the following functions:

  - extend the possibility to specify multiple entry classes in
 program-class or Main-Class - in this case, the user needs to use
 -c flag to pick program to run every time

  - add a CLI option that allows the user to see what entry point classes
 are available
for this, consider
  a) program-class entry
  b) Main-Class entry
  c) if neither is found, scan jar-file for classes implementing
 Program interface
  d) if still not found, scan jar-file for classes with main method

  - if user looks for entry point classes via CLI, check for
 ProgramDesciption interface and show info

  - extend WebUI to show all available entry-classes (pull request
 already there, for multiple entries in program-class)

  - extend WebUI to show ProgramDescription info


 What do you think? I am not too sure about the auto scan of the jar
 file if no manifest entry is provided. We might get some fat jars and
 scanning might take some time.


 -Matthias




 On 05/19/2015 10:44 AM, Stephan Ewen wrote:
  We actually has an interface like that before (Program). It is still
  supported, but in all new programs we simply use the Java main method.
 The
  advantage is that
  most IDEs can create executable JARs automatically, setting the JAR
  manifest attributes, etc.
 
  The Program interface still works, though. Most tool classes (like
  PackagedProgram) have a way to figure out whether the code uses
 main()
  or implements Program
  and calls the right method.
 
  You can try and extend the program interface. If you want to consistently
  support multiple programs in one JAR file, you may need to adjust the
 util
  classes as
  well to deal with that.
 
 
 
  On Tue, May 19, 2015 at 10:10 AM, Matthias J. Sax 
  mj...@informatik.hu-berlin.de wrote:
 
  Supporting an interface like this seems to be a nice idea. Any other
  opinions on it?
 
  It seems to be some more work to get it done right. I don't want to
  start working on it, before it's clear that it has a chance to be
  included in Flink.
 
  @Flavio: I moved the discussion to dev mailing list (user list is not
  appropriate for this discussion). Are you subscribed to it or should I
  cc you in each mail?
 
 
  -Matthias
 
 
  On 05/19/2015 09:39 AM, Flavio Pompermaier wrote:
  Nice feature Matthias!
  My suggestion is to create a specific Flink interface to get also
  description of a job and standardize parameter passing.
  Then, somewhere (e.g. Manifest) you could specify the list of packages
  (or
  also directly the classes) to inspect with reflection to extract the
 list
  of available Flink jobs.
  Something like:
 
  public interface FlinkJob {
 
  /** The name to display in the job submission UI or shell */
  //e.g. My Flink HelloWorld
  String getDisplayName();
   //e.g. This program does this and that etc..
  String getDescription();
   //e.g. 0,Integer,An integer representing my first param,
  1,String,An
  string representing my second param
  ListTuple3Integer, TypeInfo, String paramDescription;
   /** Set up the flink job in the passed ExecutionEnvironment */
  ExecutionEnvironment config(ExecutionEnvironment env);
  }
 
  What do you think?
 
 
 
  On Sun, May 17, 2015 at 10:38 PM, 

Re: question please

2015-05-22 Thread Chiwan Park
Hi.

Hadoop is a framework for reliable, scalable, distributed computing. So, there 
are many components for this purpose such as HDFS, YARN and Hadoop MapReduce. 
Flink is an alternative to Hadoop MapReduce component. It has also some tools 
to make map-reduce program and extends it to support many operations.

You can see more detail description in Flinkā€™s Homepage[1]

[1] http://flink.apache.org/faq.html#is-flink-a-hadoop-project 
http://flink.apache.org/faq.html#is-flink-a-hadoop-project


Regards.
Chiwan Park


 On May 22, 2015, at 3:02 PM, Eng Fawzya eng.faw...@gmail.com wrote:
 
 hi,
 i want to know what is the difference between FLink and Hadoop?
 
 -- 
 Fawzya Ramadan Sayed,
 Teaching Assistant,
 Computer Science Department,
 Faculty of Computers and Information,
 Fayoum University



Re: difference between reducefunction and GroupReduceFunction

2015-05-22 Thread Maximilian Michels
Pardon, what I said is not completely right. Both functions are
incrementally constructed. This seems obvious for the reduce function but
is also true for the GroupReduce because it receives the values as an
Iterable which, under the hood, can be constructed incrementally as well.

One other difference is that the traditional reduce always applies a
combiner before shuffling the results. The GroupReduceFunction, on the
other hand, does not do that unless you explicitly specify a combiner using
the RichGroupReduceFunction or perform a GroupCombine operation before the
GroupReduce.

Best regards,
Max


On Fri, May 22, 2015 at 10:03 AM, Maximilian Michels m...@apache.org wrote:

 Like you said, it depends on the use case. The GroupReduceFunction is a
 generalization of the traditional reduce. Thus, it is more powerful.
 However, it is also executed differently; a GroupReduceFunction requires
 the whole group to be materialized and passed at once. If your program
 doesn't require that, use the normal reduce function.

 On Thu, May 21, 2015 at 4:42 PM, santosh_rajaguru sani...@gmail.com
 wrote:

 i am new to flink and map reduce. My query is
 Apart from incrementally combing 2 elements, what are the merits of using
 reduceFunction over GroupReduceFunction. which usecases suits what
 functions
 the most!!!






 --
 View this message in context:
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/difference-between-reducefunction-and-GroupReduceFunction-tp5768.html
 Sent from the Apache Flink Mailing List archive. mailing list archive at
 Nabble.com.





Re: Package multiple jobs in a single jar

2015-05-22 Thread Robert Metzger
Thank you for working on this.
My responses are inline below:

(Flavio)

 My suggestion is to create a specific Flink interface to get also
 description of a job and standardize parameter passing.


I've recently merged the ParameterTool which is solving the standardize
parameter passing problem (at least it presents a best practice) :
http://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application

Regarding the description: Maybe we can use the ProgramDescription
interface for getting a string describing the program in the web frontend.

(Matthias)

 I don't want to start working on it, before it's clear that it has a
 chance to be
 included in Flink.


I think the changes discussed here won't change the current behavior, but
they add new functionality which
can make the life of our users easier, so I'll vote to include your changes
(given they meet our quality standards)


If multiple classes implement Program interface an exception should be
 through (I think that would make sense). However, I am not sure was
 good behavior is, if a single Program-class is found and an
 additional main-method class.
   - should Program-class be executed (ie, overwrite main-method class)
   - or, better to through an exception ?


I would give a class implementing Program priority over a random main()
method in a random class.
Maybe printing a WARN log message informing the user that the Program
class has been choosen.


If no Program-class is found, but a single main-method class, Flink
 could execute using main method. But I am not sure either, if this is
 good behavior. If multiple main-method classes are present, throwing
 and exception is the only way to got, I guess.


I think the best effort approach one class with main() found is good. In
case of multiple main methods, a helpful exception is the best approach in
my opinion.


 If the manifest contains program-class or Main-Class entry,
 should we check the jar file right away if the specified class is there?
 Right now, no check is performed and an error occurs if the user tries
 to execute the job.


I'd say the current approach is sufficient. There is no need to have a
special code path which is doing the check.
I think the error message will be pretty similar in both cases and I fear
that this additional code could also introduce new bugs ;)




On Fri, May 22, 2015 at 9:06 PM, Matthias J. Sax 
mj...@informatik.hu-berlin.de wrote:

 Hi,

 two more thoughts to this discussion:

  1) looking at the commit history of CliFrontend, I found the
 following closed issue and the closing pull request
 * https://issues.apache.org/jira/browse/FLINK-1095
 * https://github.com/apache/flink/pull/238
 It stand in opposite of Flavio's request to have a job description. Any
 comment on this? Should a removed feature be re-introduced? If not, I
 would suggest to remove the ProgramDescription interface completely.

  2) If the manifest contains program-class or Main-Class entry,
 should we check the jar file right away if the specified class is there?
 Right now, no check is performed and an error occurs if the user tries
 to execute the job.


 -Matthias


 On 05/22/2015 12:06 PM, Matthias J. Sax wrote:
  Thanks for your feedback.
 
  I agree on the main method problem. For scanning and listing all stuff
  that is found it's fine.
 
  The tricky question is the automatic invocation mechanism, if -c flag
  is not used, and no manifest program-class or Main-Class entry is found.
 
  If multiple classes implement Program interface an exception should be
  through (I think that would make sense). However, I am not sure was
  good behavior is, if a single Program-class is found and an
  additional main-method class.
- should Program-class be executed (ie, overwrite main-method
 class)
- or, better to through an exception ?
 
  If no Program-class is found, but a single main-method class, Flink
  could execute using main method. But I am not sure either, if this is
  good behavior. If multiple main-method classes are present, throwing
  and exception is the only way to got, I guess.
 
  To sum up: Should Flink consider main-method classes for automatic
  invocation, or should it be required for main-method classes to either
  list them in program-class or Main-Class manifest parameter (to
  enable them for automatic invocation)?
 
 
  -Matthias
 
 
 
 
  On 05/22/2015 09:56 AM, Maximilian Michels wrote:
  Hi Matthias,
 
  Thank you for taking the time to analyze Flink's invocation behavior. I
  like your proposal. I'm not sure whether it is a good idea to scan the
  entire JAR for main methods. Sometimes, main methods are added solely
 for
  testing purposes and don't really serve any practical use. However, if
  you're already going through the JAR to find the ProgramDescription
  interface, then you might look for main methods as well. As long as it
 is
  just a listing 

Re: Package multiple jobs in a single jar

2015-05-22 Thread Matthias J. Sax
Makes sense to me. :)

One more thing: What about extending the ProgramDescription interface
to have multiple methods as Flavio suggested (with the config(...)
method that should be handle by the ParameterTool)

 public interface FlinkJob {
 
 /** The name to display in the job submission UI or shell */
 //e.g. My Flink HelloWorld
 String getDisplayName();
 //e.g. This program does this and that etc..
 String getDescription();
 //e.g. 0,Integer,An integer representing my first param, 1,String,An 
 string representing my second param
 ListTuple3Integer, TypeInfo, String paramDescription;
 /** Set up the flink job in the passed ExecutionEnvironment */
 ExecutionEnvironment config(ExecutionEnvironment env);
 }

Right now, the interface is used only a couple of times in Flink's code
base, so it would not be a problem to update those classes. However, it
could break external code that uses the interface already (even if I
doubt that the interface is well known and used often [or at all]).

I personally don't think, that getDiplayName() to too helpful.
Splitting the program description and the parameter description seems to
be useful. For example, if wrong parameters are provided, the parameter
description can be included in the error message. If program+parameter
description is given in a single string, this is not possible. But this
is only a minor issue of course.

Maybe, we should also add the interface to the current Flink examples,
to make people more aware of it. Is there any documentation on the web site.


-Matthias



On 05/22/2015 09:43 PM, Robert Metzger wrote:
 Thank you for working on this.
 My responses are inline below:
 
 (Flavio)
 
 My suggestion is to create a specific Flink interface to get also
 description of a job and standardize parameter passing.
 
 
 I've recently merged the ParameterTool which is solving the standardize
 parameter passing problem (at least it presents a best practice) :
 http://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application
 
 Regarding the description: Maybe we can use the ProgramDescription
 interface for getting a string describing the program in the web frontend.
 
 (Matthias)
 
 I don't want to start working on it, before it's clear that it has a
 chance to be
 included in Flink.
 
 
 I think the changes discussed here won't change the current behavior, but
 they add new functionality which
 can make the life of our users easier, so I'll vote to include your changes
 (given they meet our quality standards)
 
 
 If multiple classes implement Program interface an exception should be
 through (I think that would make sense). However, I am not sure was
 good behavior is, if a single Program-class is found and an
 additional main-method class.
   - should Program-class be executed (ie, overwrite main-method class)
   - or, better to through an exception ?
 
 
 I would give a class implementing Program priority over a random main()
 method in a random class.
 Maybe printing a WARN log message informing the user that the Program
 class has been choosen.
 
 
 If no Program-class is found, but a single main-method class, Flink
 could execute using main method. But I am not sure either, if this is
 good behavior. If multiple main-method classes are present, throwing
 and exception is the only way to got, I guess.
 
 
 I think the best effort approach one class with main() found is good. In
 case of multiple main methods, a helpful exception is the best approach in
 my opinion.
 
 
  If the manifest contains program-class or Main-Class entry,
 should we check the jar file right away if the specified class is there?
 Right now, no check is performed and an error occurs if the user tries
 to execute the job.
 
 
 I'd say the current approach is sufficient. There is no need to have a
 special code path which is doing the check.
 I think the error message will be pretty similar in both cases and I fear
 that this additional code could also introduce new bugs ;)
 
 
 
 
 On Fri, May 22, 2015 at 9:06 PM, Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:
 
 Hi,

 two more thoughts to this discussion:

  1) looking at the commit history of CliFrontend, I found the
 following closed issue and the closing pull request
 * https://issues.apache.org/jira/browse/FLINK-1095
 * https://github.com/apache/flink/pull/238
 It stand in opposite of Flavio's request to have a job description. Any
 comment on this? Should a removed feature be re-introduced? If not, I
 would suggest to remove the ProgramDescription interface completely.

  2) If the manifest contains program-class or Main-Class entry,
 should we check the jar file right away if the specified class is there?
 Right now, no check is performed and an error occurs if the user tries
 to execute the job.


 -Matthias


 On 05/22/2015 12:06 PM, Matthias J. Sax wrote:
 Thanks for your feedback.

 I agree on the main method 

Re: Package multiple jobs in a single jar

2015-05-22 Thread Matthias J. Sax
Thanks for your feedback.

I agree on the main method problem. For scanning and listing all stuff
that is found it's fine.

The tricky question is the automatic invocation mechanism, if -c flag
is not used, and no manifest program-class or Main-Class entry is found.

If multiple classes implement Program interface an exception should be
through (I think that would make sense). However, I am not sure was
good behavior is, if a single Program-class is found and an
additional main-method class.
  - should Program-class be executed (ie, overwrite main-method class)
  - or, better to through an exception ?

If no Program-class is found, but a single main-method class, Flink
could execute using main method. But I am not sure either, if this is
good behavior. If multiple main-method classes are present, throwing
and exception is the only way to got, I guess.

To sum up: Should Flink consider main-method classes for automatic
invocation, or should it be required for main-method classes to either
list them in program-class or Main-Class manifest parameter (to
enable them for automatic invocation)?


-Matthias




On 05/22/2015 09:56 AM, Maximilian Michels wrote:
 Hi Matthias,
 
 Thank you for taking the time to analyze Flink's invocation behavior. I
 like your proposal. I'm not sure whether it is a good idea to scan the
 entire JAR for main methods. Sometimes, main methods are added solely for
 testing purposes and don't really serve any practical use. However, if
 you're already going through the JAR to find the ProgramDescription
 interface, then you might look for main methods as well. As long as it is
 just a listing without execution, that should be fine.
 
 Best regards,
 Max
 
 On Thu, May 21, 2015 at 3:43 PM, Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:
 
 Hi,

 I had a look into the current Workflow of Flink with regard to the
 progressing steps of a jar file.

 If I got it right it works as follows (not sure if this is documented
 somewhere):

 1) check, if -c flag is used to set program entry point
if yes, goto 4
 2) try to extract program-class property from manifest
(if found goto 4)
 3) try to extract Main-Class property from manifest
- if not found through exception (this happens also, if no manifest
 file is found at all)

 4) check if entry point class implements Program interface
if yes, goto 6
 5) check if entry point class provided public static void main(String[]
 args) method
- if not, through exception

 6) execute program (ie, show plan/info or really run it)


 I also discovered the interface ProgramDescription with a single
 method String getDescription(). Even if some examples implement this
 interface (and use it in the example itself), Flink basically ignores
 it... From the CLI there is no way to get this info, and the WebUI does
 actually get it if present, however, doesn't show it anywhere...


 I think it would be nice, if we would extend the following functions:

  - extend the possibility to specify multiple entry classes in
 program-class or Main-Class - in this case, the user needs to use
 -c flag to pick program to run every time

  - add a CLI option that allows the user to see what entry point classes
 are available
for this, consider
  a) program-class entry
  b) Main-Class entry
  c) if neither is found, scan jar-file for classes implementing
 Program interface
  d) if still not found, scan jar-file for classes with main method

  - if user looks for entry point classes via CLI, check for
 ProgramDesciption interface and show info

  - extend WebUI to show all available entry-classes (pull request
 already there, for multiple entries in program-class)

  - extend WebUI to show ProgramDescription info


 What do you think? I am not too sure about the auto scan of the jar
 file if no manifest entry is provided. We might get some fat jars and
 scanning might take some time.


 -Matthias




 On 05/19/2015 10:44 AM, Stephan Ewen wrote:
 We actually has an interface like that before (Program). It is still
 supported, but in all new programs we simply use the Java main method.
 The
 advantage is that
 most IDEs can create executable JARs automatically, setting the JAR
 manifest attributes, etc.

 The Program interface still works, though. Most tool classes (like
 PackagedProgram) have a way to figure out whether the code uses
 main()
 or implements Program
 and calls the right method.

 You can try and extend the program interface. If you want to consistently
 support multiple programs in one JAR file, you may need to adjust the
 util
 classes as
 well to deal with that.



 On Tue, May 19, 2015 at 10:10 AM, Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:

 Supporting an interface like this seems to be a nice idea. Any other
 opinions on it?

 It seems to be some more work to get it done right. I don't want to
 start working on it, before it's clear that it has a chance to be
 included in Flink.

 @Flavio: I moved the 

Re: difference between reducefunction and GroupReduceFunction

2015-05-22 Thread Stephan Ewen
Performance-wise, a GroupReduceFunction with Combiner should right not be
slightly faster than the ReduceFunction, but not much.

Long term, the ReduceFunction may become faster, because it will use hash
aggregation under the hood.


On Fri, May 22, 2015 at 11:58 AM, santosh_rajaguru sani...@gmail.com
wrote:

 Thanks Maximilian.

 My use case is similar to the example given in the graph analysis.
 In graph analysis, the reduce function used is a normal reduce function.
 I executed that with both scenarios and your justification is right. the
 normal reduce function have a combiner before sorting unlike the
 GroupReduce
 function.
 my question, how is it effecting the performance as the result is same in
 both the situation.


 Thanks and Regards,
 Santosh





 --
 View this message in context:
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/difference-between-reducefunction-and-GroupReduceFunction-tp5768p5785.html
 Sent from the Apache Flink Mailing List archive. mailing list archive at
 Nabble.com.



Re: [DISCUSS] Dedicated streaming mode

2015-05-22 Thread Aljoscha Krettek
Hi,
streaming currently does not use any memory manager. All state is kept
in Java Objects on the Java Heap, for example an ArrayList for the
window buffer.

On Thu, May 21, 2015 at 11:56 PM, Henry Saputra henry.sapu...@gmail.com wrote:
 Hi Stephan, Gyula, Paris,

 How does streaming currently different in term of memory management?
 Currently we only have one MemoryManager which is used by both modes I
 believe.

 - Henry

 On Thu, May 21, 2015 at 12:34 PM, Stephan Ewen se...@apache.org wrote:
 I discussed a bit via Skype with Gyula and Paris.


 We thought about the following way to do it:

  - We add a dedicated streaming mode for now. The streaming mode supersedes
 the batch mode, so it can run both type of programs.

  - The streaming mode sets the memory manager to lazy allocation.
 - So long as it runs pure streaming jobs, the full heap will be
 available to window buffers and UDFs.
 - Batch programs can still run, so mixed workloads are not prevented.
 Batch programs are a bit less robust there, because the memory manager does
 not pre-allocate memory. UDFs can eat into Flink's memory portion.

  - The streaming mode starts the necessary configured components/services
 for state backups



 Over the next versions, we want to bring these things together:
   - use the managed memory for window buffers
   - on-demand starting of the state backend

 Then, we deprecate the streaming mode, let both modes start the cluster in
 the same way.





 On Thu, May 21, 2015 at 4:01 PM, Aljoscha Krettek aljos...@apache.org
 wrote:

 Would it not be possible to start the snapshot service once the user
 starts the first streaming job? About 2) with checkpointing coming up,
 would it not make sense to shift to managed memory rather sooner than
 later. Then this point would become moot.

 On Thu, May 21, 2015 at 3:47 PM, Matthias J. Sax
 mj...@informatik.hu-berlin.de wrote:
  What would be the consequences on mixed programs? (If there is any
  plan to support those?)
 
  Would it be necessary to have a third mode? Or would those programs
  simple run in streaming mode?
 
  -Matthias
 
  On 05/21/2015 03:12 PM, Stephan Ewen wrote:
  Hi all!
 
  We discussed a while back about introducing a dedicated streaming mode
 for
  Flink. I would like to take a go at this and implement the changes, but
  discuss them before.
 
 
  Here is a brief summary why we wanted to introduce the dedicated
 streaming
  mode:
  Even though both batch and streaming are executed by the same execution
  engine,
  a streaming setup of Flink varies a bit from a batch setup:
 
  1) The streaming cluster starts an additional service to store the
  distributed state snapshots.
 
  2) Streaming mode uses memory a bit different, so we should configure
 the
  memory manager differently. This difference may eventually go away.
 
 
 
  Concretely, to implement this, I was thinking about introducing the
  following externally visible changes
 
   - Additional scripts start-streaming-cluster.sh and
  start-streaming-local.sh
 
   - An execution mode parameter for the TaskManager (batch / streaming)
 
   - An execution mode parameter for the JobManager TaskManager (batch /
  streaming)
 
   - All local executors and mini clusters need a flag that specifies
 whether
  they will start
 a streaming cluster, or a pure batch cluster.
 
 
  Anything else that comes to your minds?
 
 
  Greetings,
  Stephan
 
 



[jira] [Created] (FLINK-2081) Change order of restore state and open for Streaming Operators

2015-05-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2081:
---

 Summary: Change order of restore state and open for Streaming 
Operators
 Key: FLINK-2081
 URL: https://issues.apache.org/jira/browse/FLINK-2081
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.9
Reporter: Aljoscha Krettek


Right now, the order is restore state - open. Users often set internal state 
in the open method, this would overwrite state that was restored from a 
checkpoint. If we change the order to open - restore this should not be a 
problem anymore.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Dedicated streaming mode

2015-05-22 Thread Stephan Ewen
Aljoscha is right. There are plans to migrate the streaming state to the
MemoryManager as well, but streaming state is not managed at this point.

What is managed in streaming jobs is the data buffered and cached in the
network stack. But that is a different memory pool than the memory manager.
We keep those pools separate because the network stack is currently more
advanced in terms of dynamically rebalancing memory, compared to the memory
manager.

On Fri, May 22, 2015 at 12:25 PM, Aljoscha Krettek aljos...@apache.org
wrote:

 Hi,
 streaming currently does not use any memory manager. All state is kept
 in Java Objects on the Java Heap, for example an ArrayList for the
 window buffer.

 On Thu, May 21, 2015 at 11:56 PM, Henry Saputra henry.sapu...@gmail.com
 wrote:
  Hi Stephan, Gyula, Paris,
 
  How does streaming currently different in term of memory management?
  Currently we only have one MemoryManager which is used by both modes I
  believe.
 
  - Henry
 
  On Thu, May 21, 2015 at 12:34 PM, Stephan Ewen se...@apache.org wrote:
  I discussed a bit via Skype with Gyula and Paris.
 
 
  We thought about the following way to do it:
 
   - We add a dedicated streaming mode for now. The streaming mode
 supersedes
  the batch mode, so it can run both type of programs.
 
   - The streaming mode sets the memory manager to lazy allocation.
  - So long as it runs pure streaming jobs, the full heap will be
  available to window buffers and UDFs.
  - Batch programs can still run, so mixed workloads are not
 prevented.
  Batch programs are a bit less robust there, because the memory manager
 does
  not pre-allocate memory. UDFs can eat into Flink's memory portion.
 
   - The streaming mode starts the necessary configured
 components/services
  for state backups
 
 
 
  Over the next versions, we want to bring these things together:
- use the managed memory for window buffers
- on-demand starting of the state backend
 
  Then, we deprecate the streaming mode, let both modes start the cluster
 in
  the same way.
 
 
 
 
 
  On Thu, May 21, 2015 at 4:01 PM, Aljoscha Krettek aljos...@apache.org
  wrote:
 
  Would it not be possible to start the snapshot service once the user
  starts the first streaming job? About 2) with checkpointing coming up,
  would it not make sense to shift to managed memory rather sooner than
  later. Then this point would become moot.
 
  On Thu, May 21, 2015 at 3:47 PM, Matthias J. Sax
  mj...@informatik.hu-berlin.de wrote:
   What would be the consequences on mixed programs? (If there is any
   plan to support those?)
  
   Would it be necessary to have a third mode? Or would those programs
   simple run in streaming mode?
  
   -Matthias
  
   On 05/21/2015 03:12 PM, Stephan Ewen wrote:
   Hi all!
  
   We discussed a while back about introducing a dedicated streaming
 mode
  for
   Flink. I would like to take a go at this and implement the changes,
 but
   discuss them before.
  
  
   Here is a brief summary why we wanted to introduce the dedicated
  streaming
   mode:
   Even though both batch and streaming are executed by the same
 execution
   engine,
   a streaming setup of Flink varies a bit from a batch setup:
  
   1) The streaming cluster starts an additional service to store the
   distributed state snapshots.
  
   2) Streaming mode uses memory a bit different, so we should
 configure
  the
   memory manager differently. This difference may eventually go away.
  
  
  
   Concretely, to implement this, I was thinking about introducing the
   following externally visible changes
  
- Additional scripts start-streaming-cluster.sh and
   start-streaming-local.sh
  
- An execution mode parameter for the TaskManager (batch /
 streaming)
  
- An execution mode parameter for the JobManager TaskManager
 (batch /
   streaming)
  
- All local executors and mini clusters need a flag that specifies
  whether
   they will start
  a streaming cluster, or a pure batch cluster.
  
  
   Anything else that comes to your minds?
  
  
   Greetings,
   Stephan
  
  
 



Re: difference between reducefunction and GroupReduceFunction

2015-05-22 Thread santosh_rajaguru
Thanks Maximilian.

My use case is similar to the example given in the graph analysis.
In graph analysis, the reduce function used is a normal reduce function.
I executed that with both scenarios and your justification is right. the
normal reduce function have a combiner before sorting unlike the GroupReduce
function. 
my question, how is it effecting the performance as the result is same in
both the situation. 


Thanks and Regards,
Santosh





--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/difference-between-reducefunction-and-GroupReduceFunction-tp5768p5785.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.


Re: Package multiple jobs in a single jar

2015-05-22 Thread Matthias J. Sax
Hi,

two more thoughts to this discussion:

 1) looking at the commit history of CliFrontend, I found the
following closed issue and the closing pull request
* https://issues.apache.org/jira/browse/FLINK-1095
* https://github.com/apache/flink/pull/238
It stand in opposite of Flavio's request to have a job description. Any
comment on this? Should a removed feature be re-introduced? If not, I
would suggest to remove the ProgramDescription interface completely.

 2) If the manifest contains program-class or Main-Class entry,
should we check the jar file right away if the specified class is there?
Right now, no check is performed and an error occurs if the user tries
to execute the job.


-Matthias


On 05/22/2015 12:06 PM, Matthias J. Sax wrote:
 Thanks for your feedback.
 
 I agree on the main method problem. For scanning and listing all stuff
 that is found it's fine.
 
 The tricky question is the automatic invocation mechanism, if -c flag
 is not used, and no manifest program-class or Main-Class entry is found.
 
 If multiple classes implement Program interface an exception should be
 through (I think that would make sense). However, I am not sure was
 good behavior is, if a single Program-class is found and an
 additional main-method class.
   - should Program-class be executed (ie, overwrite main-method class)
   - or, better to through an exception ?
 
 If no Program-class is found, but a single main-method class, Flink
 could execute using main method. But I am not sure either, if this is
 good behavior. If multiple main-method classes are present, throwing
 and exception is the only way to got, I guess.
 
 To sum up: Should Flink consider main-method classes for automatic
 invocation, or should it be required for main-method classes to either
 list them in program-class or Main-Class manifest parameter (to
 enable them for automatic invocation)?
 
 
 -Matthias
 
 
 
 
 On 05/22/2015 09:56 AM, Maximilian Michels wrote:
 Hi Matthias,

 Thank you for taking the time to analyze Flink's invocation behavior. I
 like your proposal. I'm not sure whether it is a good idea to scan the
 entire JAR for main methods. Sometimes, main methods are added solely for
 testing purposes and don't really serve any practical use. However, if
 you're already going through the JAR to find the ProgramDescription
 interface, then you might look for main methods as well. As long as it is
 just a listing without execution, that should be fine.

 Best regards,
 Max

 On Thu, May 21, 2015 at 3:43 PM, Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:

 Hi,

 I had a look into the current Workflow of Flink with regard to the
 progressing steps of a jar file.

 If I got it right it works as follows (not sure if this is documented
 somewhere):

 1) check, if -c flag is used to set program entry point
if yes, goto 4
 2) try to extract program-class property from manifest
(if found goto 4)
 3) try to extract Main-Class property from manifest
- if not found through exception (this happens also, if no manifest
 file is found at all)

 4) check if entry point class implements Program interface
if yes, goto 6
 5) check if entry point class provided public static void main(String[]
 args) method
- if not, through exception

 6) execute program (ie, show plan/info or really run it)


 I also discovered the interface ProgramDescription with a single
 method String getDescription(). Even if some examples implement this
 interface (and use it in the example itself), Flink basically ignores
 it... From the CLI there is no way to get this info, and the WebUI does
 actually get it if present, however, doesn't show it anywhere...


 I think it would be nice, if we would extend the following functions:

  - extend the possibility to specify multiple entry classes in
 program-class or Main-Class - in this case, the user needs to use
 -c flag to pick program to run every time

  - add a CLI option that allows the user to see what entry point classes
 are available
for this, consider
  a) program-class entry
  b) Main-Class entry
  c) if neither is found, scan jar-file for classes implementing
 Program interface
  d) if still not found, scan jar-file for classes with main method

  - if user looks for entry point classes via CLI, check for
 ProgramDesciption interface and show info

  - extend WebUI to show all available entry-classes (pull request
 already there, for multiple entries in program-class)

  - extend WebUI to show ProgramDescription info


 What do you think? I am not too sure about the auto scan of the jar
 file if no manifest entry is provided. We might get some fat jars and
 scanning might take some time.


 -Matthias




 On 05/19/2015 10:44 AM, Stephan Ewen wrote:
 We actually has an interface like that before (Program). It is still
 supported, but in all new programs we simply use the Java main method.
 The
 advantage is that
 most IDEs can create executable JARs automatically,