Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-15 Thread Martin Neumann
I think this easier done in a straw poll than in an email conversation.
I created one at: http://www.strawpoll.me/12535073
(Note that you have multiple choices.)


Though I prefer Java 8 most of the time I have to work on Java 7. A lot of the 
infrastructure I work on still runs Java 7, one of the companies I build a 
prototype for a while back just updated to Java 7 2 years ago. I doubt we can 
ditch Java 7 support any time soon if we want to make it easy for companies to 
use Flink.

cheers Martin

//PS sorry if this gets sent twice, we just migrated to a new mail system and a 
lot of things are broken


From: Stephan Ewen 
Sent: Wednesday, March 15, 2017 12:30:24 PM
To: u...@flink.apache.org; dev@flink.apache.org
Subject: [POLL] Who still uses Java 7 with Flink ?

Hi all!

I would like to get a feeling how much Java 7 is still being used among Flink 
users.

At some point, it would be great to drop Java 7 support and make use of Java 
8's new features, but first we would need to get a feeling how much Java 7 is 
still used.

Would be happy if users on Java 7 respond here, or even users that have some 
insights into how widespread they think Java 7 still is.

Thanks,
Stephan






Re: Some thoughts about the lower-level Flink APIs

2016-08-17 Thread Martin Neumann
I agree with Vasia that for data scientist it's likely easier to learn the
high-level api. I like the material from
http://dataartisans.github.io/flink-training/ but all of them focus on the
high level api.

Maybe we could have a guide (blog post, lecture, whatever) on how to get
into Flink as a Storm guy. That would allow people with that background to
directly dive into the lower level api working with models similar to what
they were used to. I would volunteer but I'm not familiar with Storm.

I for my part, always had to use some lower level api in all of my
application, most of the time pestering Aljioscha about the details. So
either I'm the exception or there is a need for more complex examples
showcasing the lower level api methods.
One of the things I have been using in several pipelines so far is
extracting the start and end timestamp from a window adding it to the
window aggregate. Maybe something like this could be a useful example to
include into the training.

Side question:
I assume there are recurring design patterns in stream applications user
develop. Is there any chance we will be able to identify or create some
design patterns (similar to java design pattern). That would make it easier
to use the lower level api and might help people to avoid pitfalls like the
one Alijosha mentioned.

cheers Martin
PS: I hope its fine for me to butt into the discussion like this.

On Tue, Aug 16, 2016 at 4:34 PM, Wright, Eron  wrote:

> Jamie,
> I think you raise a valid concern but I would hesitate to accept the
> suggestion that the low-level API be promoted to app developers.
>
> Higher-level abstractions tend to be more constrained and more optimized,
> whereas lower-level abstractions tend to be more powerful, be more
> laborious to use and provide the system with less knowledge.   It is a
> classic tradeoff.
>
> I think it important to consider, what are the important/distinguishing
> characteristics of the Flink framework.Exactly-once guarantees,
> event-time support, support for job upgrade without data loss, fault
> tolerance, etc.I’m speculating that the high-level abstraction provided
> to app developers is probably needed to retain those charactistics.
>
> I think Vasia makes a good point that SQL might be a good alternative way
> to ease into Flink.
>
> Finally, I believe the low-level API is primarily intended for extension
> purposes (connectors, operations, etc) not app development.It could use
> better documentation to ensure that third-party extensions support those
> key characteristics.
>
> -Eron
>
> > On Aug 16, 2016, at 3:12 AM, Vasiliki Kalavri 
> wrote:
> >
> > Hi Jamie,
> >
> > thanks for sharing your thoughts on this! You're raising some interesting
> > points.
> >
> > Whether users find the lower-level primitives more intuitive depends on
> > their background I believe. From what I've seen, if users are coming from
> > the S4/Storm world and are used to the "compositional" way of streaming,
> > then indeed it's easier for them to think and operate on that level.
> These
> > are usually people who have seen/built streaming things before trying out
> > Flink.
> > But if we're talking about analysts and people coming from the "batch"
> way
> > of thinking or people used to working with SQL/python, then the
> > higher-level declarative API is probably easier to understand.
> >
> > I do think that we should make the lower-level API more visible and
> > document it properly, but I'm not sure if we should teach Flink on this
> > level first. I think that presenting it as a set of "advanced" features
> > makes more sense actually.
> >
> > Cheers,
> > -Vasia.
> >
> > On 16 August 2016 at 04:24, Jamie Grier  wrote:
> >
> >> You lost me at lattice, Aljoscha ;)
> >>
> >> I do think something like the more powerful N-way FlatMap w/ Timers
> >> Aljoscha is describing here would probably solve most of the problem.
> >> Often Flink's higher level primitives work well for people and that's
> >> great.  It's just that I also spend a fair amount of time discussing
> with
> >> people how to map what they know they want to do onto operations that
> >> aren't a perfect fit and it sometimes liberates them when they realize
> they
> >> can just implement it the way they want by dropping down a level.  They
> >> usually don't go there themselves, though.
> >>
> >> I mention teaching this "first" and then the higher layers I guess
> because
> >> that's just a matter of teaching philosophy.  I think it's good to to
> see
> >> the basic operations that are available first and then understand that
> the
> >> other abstractions are built on top of that.  That way you're not
> afraid to
> >> drop-down to basics when you know what you want to get done.
> >>
> >> -Jamie
> >>
> >>
> >> On Mon, Aug 15, 2016 at 2:11 AM, Aljoscha Krettek 
> >> wrote:
> >>
> >>> Hi All,
> >>> I also thought about this recently. 

Re: Opening a discussion on FlinkML

2016-02-14 Thread Martin Neumann
I think the focus of this discussion should be how we proceed not what to
do. The what comes from the committers anyway.

There are several people who like to commit, including people from the
Streamline project. Having pull requests that are older than 6 Month is not
good for any project.
The main question is how can we develop the library further with high
standards but without creating a bottleneck that holds things back to much.

In my opinion it would be best if we find enough resources to keep things
inside Flink. However if we have to depend on people who are
already stretched for time, splitting it out might be the better option.
(path 1 from Theos original mail)

cheers Martin




On Fri, Feb 12, 2016 at 3:54 PM, Suneel Marthi  wrote:

> On Fri, Feb 12, 2016 at 9:40 AM, Simone Robutti <
> simone.robu...@radicalbit.io> wrote:
>
> > @Suneel
> >
> > 1) Totally agree, as I wrote before.
> >
> > 2)I agree that support for PMML is premature but we shouldn't
> underestimate
> > the variety and complexity of the uses of ML models in the industry. The
> > adoption of Flink, hopefully, will grow and reach less innovative
> realities
> > where Random Forests and SVMs are still the main algorithms in use. In
> > these same realities there are legacies that justify the use of PMML to
> > port models. Still, FlinkML is still in an early stage so as you said, it
> > doesn't make sense to spend time right now on such a feature.
> >
>
> +1, as I mentioned earlier the PMML spec only supports classification and
> clustering (I last checked this in Aug 2015, pretty sure it would not have
> changed since then); hence 'Yes' it has some limited uses; 'No' - its too
> premature to even talk about it given the present state of FlinkML.
>
> >
> > 3)This would be really interesting. How do you imagine that the
> integration
> > with a distributed processing engine would work?
> >
>
> I am not sure yet, we r still exploring this on Mahout project to add to
> Mahout-Samsara - most of the statistics and probabilistic modeling would
> then be supported by Figaro (Bayesian, MCMC etc) and hence can be external
> to FlinkML.
>
> Figaro is Scala based. See https://github.com/p2t2/figaro
>
> I believe there are few other similar DSLs out there, need to dig up my old
> emails.
>
> (Not sure if its ASLv2 License, need verification here)
>
>
> >
> > 5) Agree on this one too. To my knowledge it would be the best option
> > together with SAMOA (for the streaming part).
> >
>
> There's already Flink - Samoa integration in place IIRC.
>
>
> >
> > 2016-02-12 15:25 GMT+01:00 Suneel Marthi :
> >
> > > My 2 cents as someone who's done ML over the years - having worked on
> > Oryx
> > > 2.0 and Mahout and having used Spark MlLib (read as "had no choice due
> to
> > > strict workplace enforcement") and understands well their limitations.
> > >
> > > 1. FlinkML in its present form seems like "do it like how Spark did
> it".
> > >
> > > 2. The recent discussion about PMML support in Flink to my mind is a
> > clear
> > > example of putting the cart before the horse.  Why are we even talking
> > PMML
> > > when there ain't much ML algos in FlinkML?
> > >
> > > For a real good implementation of PMML and how its being used (with
> > jPMML),
> > > suggest look at the Oryx 2.0 project. The PMML implementation in Oryx
> 2.0
> > > predates Spark and is a clean example of separating PMML from the
> > > underlying framework (Spark or Flink).
> > >
> > > We have had PMML discussions on the Mahout project in the past, but the
> > > idea never gained any traction in large part due to PMML spec
> limitations
> > > (mostly for clustering and classification algorithms) and the lack of
> > > adoption within the community.
> > >
> > > See the discussion here and specifically Ted Dunning's comment on PMML
> -
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/mahout-dev/201503.mbox/%3CCAJwFCa1%3DAw%2B3G54FgkYdTH%3DoNQBRqfeU-SS19iCFKMWbAfWzOQ%40mail.gmail.com%3E
> > >
> > > Most of the ML in practice (deployed in production) today are
> > Recommenders
> > > and Deep Learning - both of which are not supported by the PMML spec.
> > >
> > > 3. Leveraging a probabilistic programming language like Figaro might
> be a
> > > good way to go (just my thought) - that way most of the ML groundwork
> > would
> > > be external to Flink.
> > >
> > > 4. Within the Mahout community, we had been talking (and are working)
> on
> > > redoing the Samsara Distributed linear algebra framework to support
> Flink
> > > (in large part we realized that Flink is a better platform than the
> more
> > > popular one out there that Slim wouldn't wanna talk about :) ).
> > >
> > >  We should be having a release out in the next few weeks (depending on
> > > committers' availability). It would be great if FlinkML had something
> > like
> > > it.
> > >
> > > There was a good audience to Sebastian's talk on this subject at #FF15
> in
> > > October.
> > >

User Feedback

2016-02-09 Thread Martin Neumann
During this year's FOSDEM Martin Junghans and I set together and gathered
some feedback for the Flink project. It is based on our personal experience
as well as the feedback and questions from People we taught the system.
This is going to be a longer email therefore I have split things into
categories:


*Website and Documentation:*

   1. *Out-dated Google Search results*: Google searches lead to outdated
   web site versions (e.g. “flink transformations” or “flink iterations”
   return the 0.7 version of the corresponding pages).
   2. *Invalid Links on Website: *Links are confusing / broken (e.g. the
   Gelly /ML Links on the start page lead to the top of the feature page
   (which start with streaming) *-> maybe this can be validated
   automatically?*


*Batch API:*

   1. *.reduceGroup(GroupReduceFunction) and
   .groupCombine(CombineGroupFunction): *In other functions such as
   .flatMap(FlatMapFunction) the function call matches the naming of the
   operator. This structure is quite convenient for new user since they can
   make use of the autocompletion features of the IDE, basically start typing
   the function call and you get the correct class. This does not work for
   .reduceGroup() and .groupCombine() since the names are switched around. *->
   maybe the function can be renamed*
   2. *.print() and env.execute(): *Often .print() is used for debugging
   and developing programs replacing regular data sinks. Such a project will
   not run until the env.execute() is removed. It's very easy to forget to add
   it back in, once you change the .print() back to a proper sink. The project
   now will compile fine but will not produce any output since .execute() is
   missing. This is a very difficult bug to find especially since there is no
   warning or error when running the job. It’s common that people use more
   than one .print() statement during debugging and development. This can lead
   to confusion since each .print() forces the program to execute so the
   execution behavior is different than without the print. This is especially
   important, if the program contains non-deterministic data generation (like
   generating IDs). In the stream API .print() would not require to
   remove .execute() as a result the behavior of the two interfaces is
   inconsistent.
   3. *calling new when applying an operator eg: .reduceGroup(new
   GroupReduceFunction()): *Some of the people I taught the API’s to where
   confused by this. They knew it was a distributed system and they were
   wondering where the constructor would be actually called. They expected to
   hand a class to the function that would be initialized on each of the
   worker nodes. *-> maybe have a section about this in the documentation*
   4. *.project() loses type information / does not support .returns(..): *The
   project transformation currently loses type information which affects
   chained call with other transformations. One workaround is the definition
   of an intermediate dataset. However, to be consistent with other operators,
   project should support .returns() to define a type information if needed.


*Stream API:*

   1. *.keyBy(): *Currently .keyBy() creates a KeyedDataStream but every
   operator that consumes a KeyedDataStream produces a DataStream. This means
   it is not possible to create a program that uses a keyBy() followed by a
   sequence of transformation for each key without having to reapply keyBy()
   after each of those operators. (This was a common problem in my work for
   Ericsson and Spotify)
   2. *split() operator with multiple output types.: *Its common to have to
   split a single Stream into a different streams. For example a stream
   containing different system events might need to be broken into a stream
   for each type. The current split() operator requires all outputs to have
   the same data type. I cases where there are no direct type hierarchies the
   user needs to implement a wrapper type to make use of this function. An
   operator similar to split that allows output streams to have different
   types would greatly simplify those use cases


cheers Martin


Re: maven dependency problem when building stream job

2016-01-20 Thread Martin Neumann
Its using a maven-enforcer-plugin (see config below). The project itself
does not use netty.
I just discovered that I still compile fine from within IntelliJ I just
cannot built it externally with maven (which I need to package the jar to
deploy on Yarn).

cheers Martin


  maven-enforcer-plugin
  1.4.1
  

  enforce
  

  

true
  
  
enforce
  

  






On Wed, Jan 20, 2016 at 2:04 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Martin,
>
> is the logproc-flink project using the maven-enforcer-plugin to enforce a
> minimum version of netty?
> We recently downgraded (a minor version) of netty because of an issue.
> Maybe that's the issue.
>
> Can you check the enforcer rules of your project?
>
> On Wed, Jan 20, 2016 at 1:48 PM, Martin Neumann <mneum...@sics.se> wrote:
>
> > Hi,
> >
> > I have a weird problem. Yesterday I had to clean my local maven cache
> for a
> > different application.
> > Since afterwards one of my Flink streaming jobs does not compile
> anymore. I
> > didn't change any code just made maven pull all dependencies again.
> >
> > I'm totally stomped by this, please help me!
> >
> > Here is the error I get from maven:
> >
> > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps
> > failed with message:
> > Failed while enforcing RequireUpperBoundDeps. The error(s) are [
> > Require upper bound dependencies error for io.netty:netty:3.7.0.Final
> paths
> > to dependency are:
> > +-com.spotify:logproc-flink:0.0.0-SNAPSHOT
> >   +-org.apache.flink:flink-java:0.10.1
> > +-org.apache.flink:flink-shaded-hadoop2:0.10.1
> >   +-org.apache.zookeeper:zookeeper:3.4.6
> > +-io.netty:netty:3.7.0.Final
> > and
> > +-com.spotify:logproc-flink:0.0.0-SNAPSHOT
> >   +-org.apache.flink:flink-streaming-java:0.10.1
> > +-org.apache.flink:flink-runtime:0.10.1
> >   +-com.typesafe.akka:akka-remote_2.10:2.3.7
> > +-io.netty:netty:3.8.0.Final
> > ]
> > [ERROR] Failed to execute goal
> > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce) on
> > project logproc-flink: Some Enforcer rules have failed. Look above for
> > specific messages explaining why the rule failed. -> [Help 1]
> >
>


maven dependency problem when building stream job

2016-01-20 Thread Martin Neumann
Hi,

I have a weird problem. Yesterday I had to clean my local maven cache for a
different application.
Since afterwards one of my Flink streaming jobs does not compile anymore. I
didn't change any code just made maven pull all dependencies again.

I'm totally stomped by this, please help me!

Here is the error I get from maven:

[WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireUpperBoundDeps
failed with message:
Failed while enforcing RequireUpperBoundDeps. The error(s) are [
Require upper bound dependencies error for io.netty:netty:3.7.0.Final paths
to dependency are:
+-com.spotify:logproc-flink:0.0.0-SNAPSHOT
  +-org.apache.flink:flink-java:0.10.1
+-org.apache.flink:flink-shaded-hadoop2:0.10.1
  +-org.apache.zookeeper:zookeeper:3.4.6
+-io.netty:netty:3.7.0.Final
and
+-com.spotify:logproc-flink:0.0.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java:0.10.1
+-org.apache.flink:flink-runtime:0.10.1
  +-com.typesafe.akka:akka-remote_2.10:2.3.7
+-io.netty:netty:3.8.0.Final
]
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce) on
project logproc-flink: Some Enforcer rules have failed. Look above for
specific messages explaining why the rule failed. -> [Help 1]


Re: maven dependency problem when building stream job

2016-01-20 Thread Martin Neumann
Its a fork of a different project, so I didn't add it myself. I disabled it
for now and so far it works, I will try to find out why it was there in the
first place and if necessary find a workaround.

thanks for the help
Martin

On Wed, Jan 20, 2016 at 2:32 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Martin.
>
> can you try to exclude the netty dependency from your Flink dependencies?
> Another approach would be to disable the check, or add an exception to it
> ;)
>
> Why did you add the check in the first place?
>
>
> On Wed, Jan 20, 2016 at 2:13 PM, Martin Neumann <mneum...@sics.se> wrote:
>
> > Its using a maven-enforcer-plugin (see config below). The project itself
> > does not use netty.
> > I just discovered that I still compile fine from within IntelliJ I just
> > cannot built it externally with maven (which I need to package the jar to
> > deploy on Yarn).
> >
> > cheers Martin
> >
> > 
> >   maven-enforcer-plugin
> >   1.4.1
> >   
> > 
> >   enforce
> >   
> > 
> >   
> > 
> > true
> >   
> >   
> > enforce
> >   
> > 
> >   
> >
> > 
> >
> >
> >
> >
> > On Wed, Jan 20, 2016 at 2:04 PM, Robert Metzger <rmetz...@apache.org>
> > wrote:
> >
> > > Hi Martin,
> > >
> > > is the logproc-flink project using the maven-enforcer-plugin to
> enforce a
> > > minimum version of netty?
> > > We recently downgraded (a minor version) of netty because of an issue.
> > > Maybe that's the issue.
> > >
> > > Can you check the enforcer rules of your project?
> > >
> > > On Wed, Jan 20, 2016 at 1:48 PM, Martin Neumann <mneum...@sics.se>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a weird problem. Yesterday I had to clean my local maven cache
> > > for a
> > > > different application.
> > > > Since afterwards one of my Flink streaming jobs does not compile
> > > anymore. I
> > > > didn't change any code just made maven pull all dependencies again.
> > > >
> > > > I'm totally stomped by this, please help me!
> > > >
> > > > Here is the error I get from maven:
> > > >
> > > > [WARNING] Rule 0:
> > org.apache.maven.plugins.enforcer.RequireUpperBoundDeps
> > > > failed with message:
> > > > Failed while enforcing RequireUpperBoundDeps. The error(s) are [
> > > > Require upper bound dependencies error for io.netty:netty:3.7.0.Final
> > > paths
> > > > to dependency are:
> > > > +-com.spotify:logproc-flink:0.0.0-SNAPSHOT
> > > >   +-org.apache.flink:flink-java:0.10.1
> > > > +-org.apache.flink:flink-shaded-hadoop2:0.10.1
> > > >   +-org.apache.zookeeper:zookeeper:3.4.6
> > > > +-io.netty:netty:3.7.0.Final
> > > > and
> > > > +-com.spotify:logproc-flink:0.0.0-SNAPSHOT
> > > >   +-org.apache.flink:flink-streaming-java:0.10.1
> > > > +-org.apache.flink:flink-runtime:0.10.1
> > > >   +-com.typesafe.akka:akka-remote_2.10:2.3.7
> > > > +-io.netty:netty:3.8.0.Final
> > > > ]
> > > > [ERROR] Failed to execute goal
> > > > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce
> (enforce)
> > on
> > > > project logproc-flink: Some Enforcer rules have failed. Look above
> for
> > > > specific messages explaining why the rule failed. -> [Help 1]
> > > >
> > >
> >
>


Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Hej,

What is the correct way of initializing a state-full operator that is using
a hashmap? modelMapInit.getClass() does not work neither does
HashMap.class. Do I have to implement my own TypeInformation class or is
there a simpler way?

cheers Martin

private OperatorState> microModelMap;

@Override
public void open(Configuration parameters) throws Exception {
HashMap modelMapInit = new HashMap<>();
this.microModelMap =
getRuntimeContext().getKeyValueState("microModelMap",
modelMapInit.getClass() , modelMapInit);
}


Re: Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Thanks for the help.

TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its
an IDE problem that .getClass() did not work. Intellij is a bit fiddly with
those things.

1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.


What do you mean by initialize manually? Can I do that direct in the open
function or are we talking about checking for null in the FlatMap and
initializing there? In general the program is supposed to constantly run
once deployed, so I can get away with a little slower setup.

2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.


Can you point me to an example on how to do this?

cheers Martin


On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen <se...@apache.org> wrote:

> It should suffice to do something like
>
> "getRuntimeContext().getKeyValueState("microModelMap", new
> HashMap<InputType,MicroModel>().getClass(), null);"
>
> Two more comments:
>
> 1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.
>
> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.
>
> Greetings,
> Stephan
>
>
>
> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Hey,
> >
> > Yes what you wrote should work. You can alternatively use
> > TypeExtractor.getForObject(modelMapInit) to extract the tye information.
> >
> > I also like to implement my custom type info for Hashmaps and the other
> > types and use that.
> >
> > Cheers,
> > Gyula
> >
> > Martin Neumann <mneum...@sics.se> ezt írta (időpont: 2015. nov. 11.,
> Sze,
> > 16:30):
> >
> > > Hej,
> > >
> > > What is the correct way of initializing a state-full operator that is
> > using
> > > a hashmap? modelMapInit.getClass() does not work neither does
> > > HashMap.class. Do I have to implement my own TypeInformation class or
> is
> > > there a simpler way?
> > >
> > > cheers Martin
> > >
> > > private OperatorState<HashMap<InputType,MicroModel>> microModelMap;
> > >
> > > @Override
> > > public void open(Configuration parameters) throws Exception {
> > > HashMap<InputType,MicroModel> modelMapInit = new HashMap<>();
> > > this.microModelMap =
> > > getRuntimeContext().getKeyValueState("microModelMap",
> > > modelMapInit.getClass() , modelMapInit);
> > > }
> > >
> >
>


0.10 streaming state documentation out of date

2015-11-05 Thread Martin Neumann
Hej,

I'm working with some state full streaming operators at the moment and I
noticed that the Documentation is out of date.

The documentation says:

 @Override
public void open(Configuration config) {
counter = getRuntimeContext().getOperatorState(“counter”, 0L, false);
}


getOperatorState does not exist anymore, instead it should be getKeyValueState.
Also the signature of the function has changed so the description text
might need some update to.

cheers Martin


Re: [gelly] Spargel model rework

2015-11-03 Thread Martin Neumann
I tried out Spargel during my work with Spotify and have implemented
several algorithms using it. In all implementations I ended up storing
additional Data and Flags on the Vertex to carry them over from one UDF to
the next one. It definitely makes the code harder to write and maintain.

I wonder how much overhead these additional constructs cost in computation
and memory consumption. Maybe going for a less optimized 1 UDF version will
be not so much of a performance hit for most applications.



On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen  wrote:

> Actually GAS was not known when we did the iterations work (and Spargel),
> but the intuition that led to Spargel is similar then the intuition that
> led to GAS.
>
> On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen  wrote:
>
> > When creating the original version of Spargel I was pretty much thinking
> > in GSA terms, more than in Pregel terms. There are some fundamental
> > differences between Spargel and Pregel. Spargel is in between GAS and
> > Pregel in some way, that is how I have always thought about it.
> >
> > The main reason for the form is that it fits the dataflow paradigm
> easier:
> >
> >   - If one function emits the new state of the vertex and the messages,
> it
> > has two different return types, which means you need a union type and
> > filer/split type of operation on the result, which also adds overhead. In
> > the current model, each function has one return type, which makes it
> easy.
> >
> >  - The workset is also the feedback channel, which is materialized at the
> > superstep boundaries, so keeping it small at O(vertices), rather than
> > O(edges) is a win for performance.
> >
> > There is no reason to not add a Pregel model, but I would not kill
> Spargel
> > for it. It will be tough to get the Pregel variant to the same
> efficiency.
> > Unless you want to say, for efficiency, go with GSA, for convenience with
> > Pregel.
> >
> > There are some nice things about the Spargel model. The fact that
> messages
> > are first generated then consumes makes the generation of initial
> messages
> > simpler in many cases, I think. It was always a bit weird to me in Pregel
> > that you had to check whether you are in superstep one, in which case you
> > would expect no message, and generate initial value messages.
> >
> >
> >
> > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske 
> wrote:
> >
> >> We can of course inject an optional ReduceFunction (or GroupReduce, or
> >> combinable GroupReduce) to reduce the size of the work set.
> >> I suggested to remove the GroupReduce function, because it did only
> >> collect
> >> all messages into a single record by emitting the input iterator which
> is
> >> quite dangerous. Applying a combinable reduce function is could improve
> >> the
> >> performance considerably.
> >>
> >> The good news is that it would come "for free" because the necessary
> >> partitioning and sorting can be reused (given the forwardField
> annotations
> >> are correctly set):
> >> - The partitioning of the reduce can be reused for the join with the
> >> solution set
> >> - The sort of the reduce is preserved by the join with the in-memory
> >> hash-table of the solution set and can be reused for the coGroup.
> >>
> >> Best,
> >> Fabian
> >>
> >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri  >:
> >>
> >> > Hi Fabian,
> >> >
> >> > thanks so much for looking into this so quickly :-)
> >> >
> >> > One update I have to make is that I tried running a few experiments
> with
> >> > this on a 6-node cluster. The current implementation gets stuck at
> >> > "Rebuilding Workset Properties" and never finishes a single iteration.
> >> > Running the plan of one superstep without a delta iteration terminates
> >> > fine. I didn't have access to the cluster today, so I couldn't debug
> >> this
> >> > further, but I will do as soon as I have access again.
> >> >
> >> > The rest of my comments are inline:
> >> >
> >> > On 30 October 2015 at 17:53, Fabian Hueske  wrote:
> >> >
> >> > > Hi Vasia,
> >> > >
> >> > > I had a look at your new implementation and have a few ideas for
> >> > > improvements.
> >> > > 1) Sending out the input iterator as you do in the last GroupReduce
> is
> >> > > quite dangerous and does not give a benefit compared to collecting
> all
> >> > > elements. Even though it is an iterator, it needs to be completely
> >> > > materialized in-memory whenever the record is touched by Flink or
> user
> >> > > code.
> >> > > I would propose to skip the reduce step completely and handle all
> >> > messages
> >> > > separates and only collect them in the CoGroup function before
> giving
> >> > them
> >> > > into the VertexComputeFunction. Be careful, to only do that with
> >> > > objectReuse disabled or take care to properly copy the messages. If
> >> you
> >> > > collect the messages in the CoGroup, you don't need the GroupReduce,
> >> have

Re: [gelly] Spargel model rework

2015-11-03 Thread Martin Neumann
The problem with having many different graph model in gelly is that it
might get quite confusing for a user.
Maybe this can be fixed with good documentation so that its clear how each
model works and what its benefits are (and maybe when its better to use it
over a different model).

On Tue, Nov 3, 2015 at 3:29 PM, Andra Lungu  wrote:

> I also think a Giraph-like model could be added, but we shouldn't remove
> Spargel in favour of it!
>
> On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen  wrote:
>
> > When creating the original version of Spargel I was pretty much thinking
> in
> > GSA terms, more than in Pregel terms. There are some fundamental
> > differences between Spargel and Pregel. Spargel is in between GAS and
> > Pregel in some way, that is how I have always thought about it.
> >
> > The main reason for the form is that it fits the dataflow paradigm
> easier:
> >
> >   - If one function emits the new state of the vertex and the messages,
> it
> > has two different return types, which means you need a union type and
> > filer/split type of operation on the result, which also adds overhead. In
> > the current model, each function has one return type, which makes it
> easy.
> >
> >  - The workset is also the feedback channel, which is materialized at the
> > superstep boundaries, so keeping it small at O(vertices), rather than
> > O(edges) is a win for performance.
> >
> > There is no reason to not add a Pregel model, but I would not kill
> Spargel
> > for it. It will be tough to get the Pregel variant to the same
> efficiency.
> > Unless you want to say, for efficiency, go with GSA, for convenience with
> > Pregel.
> >
> > There are some nice things about the Spargel model. The fact that
> messages
> > are first generated then consumes makes the generation of initial
> messages
> > simpler in many cases, I think. It was always a bit weird to me in Pregel
> > that you had to check whether you are in superstep one, in which case you
> > would expect no message, and generate initial value messages.
> >
> >
> >
> > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske 
> wrote:
> >
> > > We can of course inject an optional ReduceFunction (or GroupReduce, or
> > > combinable GroupReduce) to reduce the size of the work set.
> > > I suggested to remove the GroupReduce function, because it did only
> > collect
> > > all messages into a single record by emitting the input iterator which
> is
> > > quite dangerous. Applying a combinable reduce function is could improve
> > the
> > > performance considerably.
> > >
> > > The good news is that it would come "for free" because the necessary
> > > partitioning and sorting can be reused (given the forwardField
> > annotations
> > > are correctly set):
> > > - The partitioning of the reduce can be reused for the join with the
> > > solution set
> > > - The sort of the reduce is preserved by the join with the in-memory
> > > hash-table of the solution set and can be reused for the coGroup.
> > >
> > > Best,
> > > Fabian
> > >
> > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri  >:
> > >
> > > > Hi Fabian,
> > > >
> > > > thanks so much for looking into this so quickly :-)
> > > >
> > > > One update I have to make is that I tried running a few experiments
> > with
> > > > this on a 6-node cluster. The current implementation gets stuck at
> > > > "Rebuilding Workset Properties" and never finishes a single
> iteration.
> > > > Running the plan of one superstep without a delta iteration
> terminates
> > > > fine. I didn't have access to the cluster today, so I couldn't debug
> > this
> > > > further, but I will do as soon as I have access again.
> > > >
> > > > The rest of my comments are inline:
> > > >
> > > > On 30 October 2015 at 17:53, Fabian Hueske 
> wrote:
> > > >
> > > > > Hi Vasia,
> > > > >
> > > > > I had a look at your new implementation and have a few ideas for
> > > > > improvements.
> > > > > 1) Sending out the input iterator as you do in the last GroupReduce
> > is
> > > > > quite dangerous and does not give a benefit compared to collecting
> > all
> > > > > elements. Even though it is an iterator, it needs to be completely
> > > > > materialized in-memory whenever the record is touched by Flink or
> > user
> > > > > code.
> > > > > I would propose to skip the reduce step completely and handle all
> > > > messages
> > > > > separates and only collect them in the CoGroup function before
> giving
> > > > them
> > > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > > objectReuse disabled or take care to properly copy the messages. If
> > you
> > > > > collect the messages in the CoGroup, you don't need the
> GroupReduce,
> > > have
> > > > > smaller records and you can remove the MessageIterator class
> > > completely.
> > > > >
> > > >
> > > > ​I see. The idea was to expose to message combiner that user could
> > > > ​implement if the 

Re: streaming GroupBy + Fold

2015-10-06 Thread Martin Neumann
The window is actually part of the workaround we currently using (should
have commented it out) where we use a window and a MapFunction instead of a
Fold.
Original I was running fold without a window facing the same problems.

The workaround works for now so there is no urgency on that one. I just
wanted to make sure I was not doing something stupid and it was a bug that
you guys where  aware of.

cheers Martin

On Tue, Oct 6, 2015 at 8:09 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> If you are using a fold you are using none of the new code paths. I will
> add support for Fold to the new windowing implementation today, though.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Oct 2015 at 23:49 Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
> > Martin, I have looked at your code and you are running a fold in a
> window,
> > that is a very important distinction - the code paths are separate.
> > Those code paths have been recently touched by Aljoscha if I am not
> > mistaken.
> >
> > I have mocked up a simple example and could not reproduce your problem
> > unfortunately. [1] Could you maybe produce a minimalistic example that we
> > can actually execute? :)
> >
> > [1]
> >
> >
> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
> >
> > On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <
> balassi.mar...@gmail.com>
> > wrote:
> >
> > > Thanks, I am checking it out tomorrow morning.
> > >
> > > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <mneum...@sics.se>
> wrote:
> > >
> > >> Hej,
> > >>
> > >> Sorry it took so long to respond I needed to check if I was actually
> > >> allowed to share the code since it uses internal datasets.
> > >>
> > >> In the appendix of this email you will find the main class of this job
> > >> without the supporting classes or the actual dataset. If you want to
> > run it
> > >> you need to replace the dataset by something else but that should be
> > >> trivial.
> > >> If you just want to see the problem itself, have a look at the
> appended
> > >> log in conjunction with the code. Each ERROR printout in the log
> > relates to
> > >> an accumulator receiving wrong values.
> > >>
> > >> cheers Martin
> > >>
> > >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
> > balassi.mar...@gmail.com
> > >> > wrote:
> > >>
> > >>> Hey,
> > >>>
> > >>> Thanks for reporting the problem, Martin. I have not merged the PR
> > >>> Stephan
> > >>> is referring to yet. [1] There I am cleaning up some of the internals
> > >>> too.
> > >>> Just out of curiosity, could you share the code for the failing test
> > >>> please?
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/1155
> > >>>
> > >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <mneum...@sics.se>
> > wrote:
> > >>>
> > >>> > One of my colleagues found it today when we where hunting bugs
> today.
> > >>> We
> > >>> > where using the latest 0.10 version pulled from maven this morning.
> > >>> > The program we where testing is new code so I cant tell you if the
> > >>> behavior
> > >>> > has changed or if it was always like this.
> > >>> >
> > >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > >>> >
> > >>> > > I think these operations were recently moved to the internal
> state
> > >>> > > interface. Did the behavior change then?
> > >>> > >
> > >>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to
> > the
> > >>> > > partitioned state?
> > >>> > >
> > >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <mneum...@sics.se
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Hej,
> > >>> > > >
> > >>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The
> > aim
> > >>> is
> > >>> > to
> > >>> > > > aggregate the values in each group.
> > >>> > > > It seems the aggregator in the Fold function is shared on
> > operator
> > >>> > level,
> > >>> > > > so all groups that end up on the same operator get mashed
> > together.
> > >>> > > >
> > >>> > > > Is this the wanted behavior? If so, what do I have to do to
> > >>> separate
> > >>> > > them?
> > >>> > > >
> > >>> > > >
> > >>> > > > cheers Martin
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>


Re: streaming GroupBy + Fold

2015-10-05 Thread Martin Neumann
Hej,

Sorry it took so long to respond I needed to check if I was actually
allowed to share the code since it uses internal datasets.

In the appendix of this email you will find the main class of this job
without the supporting classes or the actual dataset. If you want to run it
you need to replace the dataset by something else but that should be
trivial.
If you just want to see the problem itself, have a look at the appended log
in conjunction with the code. Each ERROR printout in the log relates to an
accumulator receiving wrong values.

cheers Martin

On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hey,
>
> Thanks for reporting the problem, Martin. I have not merged the PR Stephan
> is referring to yet. [1] There I am cleaning up some of the internals too.
> Just out of curiosity, could you share the code for the failing test
> please?
>
> [1] https://github.com/apache/flink/pull/1155
>
> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <mneum...@sics.se> wrote:
>
> > One of my colleagues found it today when we where hunting bugs today. We
> > where using the latest 0.10 version pulled from maven this morning.
> > The program we where testing is new code so I cant tell you if the
> behavior
> > has changed or if it was always like this.
> >
> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > I think these operations were recently moved to the internal state
> > > interface. Did the behavior change then?
> > >
> > > @Marton or Gyula, can you comment? Is it per chance not mapped to the
> > > partitioned state?
> > >
> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <mneum...@sics.se>
> wrote:
> > >
> > > > Hej,
> > > >
> > > > In one of my Programs I run a Fold on a GroupedDataStream. The aim is
> > to
> > > > aggregate the values in each group.
> > > > It seems the aggregator in the Fold function is shared on operator
> > level,
> > > > so all groups that end up on the same operator get mashed together.
> > > >
> > > > Is this the wanted behavior? If so, what do I have to do to separate
> > > them?
> > > >
> > > >
> > > > cheers Martin
> > > >
> > >
> >
>
import com.ericsson.config.Configuration;
import com.ericsson.pojos.DataPojo;
import com.ericsson.pojos.out.TimeDrift;
import com.ericsson.timedrift.TimeShiftWindowMapFunction;
import com.ericsson.utils.DataPojoSerializer;
import com.ericsson.utils.PojoSerializer;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedDataStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.concurrent.TimeUnit;


/**
 * Time drift host and time based.
 */
public class TimeDriftKafkaExampleTst {
public static void main(String[] args) throws Exception {



StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream in =  env.readTextFile("./datasets/first1.json");
DataStream events = in.map(new DeserialiseMap()).returns(DataPojo.class);

KeyedDataStream<DataPojo, String> groupedEvents = events.groupBy(t -> {
return t.getHost();
});
WindowedDataStream drift = groupedEvents
//.window(Count.of(50)).every(Count.of(10))
.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(20, TimeUnit.SECONDS))
.foldWindow(new TimeDrift(),new TimeShiftFold());
 //.mapWindow(new TimeShiftWindowMapFunction());

DataStream out = drift.flatten();
out.print();


env.execute();

}
private static class DeserialiseMap implements MapFunction<String, DataPojo> {
private static final DataPojoSerializer deserializer = new DataPojoSerializer();

@Override
public DataPojo map(String value) throws Exception {
return deserializer.deserialize(value.getBytes());
}
}
private static class TimeShiftFold implements FoldFunction<DataPojo, TimeDrift> {

@Override
public TimeDrift fold(TimeDrift accumulator, DataPojo value) throws Exception {
long t

Re: Rethink the "always copy" policy for streaming topologies

2015-10-02 Thread Martin Neumann
It seems like I'm one of the few people that run into the mutable elements
trap on the Batch API from time to time. At the moment I always clone when
I'm not 100% sure to avoid hunting the bugs later. So far I was happy to
learn that this is not a problem in Streaming, but that's just me.

When working with groupby and partition functions, its easy to forget that
there is one class per operator not per partition. So if you write your
code in the state of mind that each partition is separate object reduce on
operator level becomes really annoying.
Since the mapping between partitions and operators is usually hidden, makes
the debugging harder especially in cases where the test data produces a
single partition per operator and the real deployment does not.

*To summarize:*
I'm not against reusing objects as long as there is something that helps
ease the pitfalls. This could be coding guidelines, debugging tools or best
practices.


On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen  wrote:

> Hi all!
>
> Now that we are coming to the next release, I wanted to make sure we
> finalize the decision on that point, because it would be nice to not break
> the behavior of system afterwards.
>
> Right now, when tasks are chained together, the system copies the elements
> always between different tasks in the same chain.
>
> I think this policy was established under the assumption that copies do not
> cost anything, given our own test examples, which mainly use immutable
> types like Strings, boxed primitives, ..
>
> In practice, a lot of data types are actually quite expensive to copy.
>
> For example, a rather common data type in the event analysis of web-sources
> is JSON Object.
> Flink treats this as a generic type. Depending on its concrete
> implementation, Kryo may have perform a serialization copy, which means
> encoding into bytes (JSON encoding, charset encoding) and decoding again.
>
> This has a massive impact on the out-of-the-box performance of the system.
> Given that, I was wondering whether we should set to default policy to "not
> copying".
>
> That is basically the behavior of the batch API, and there has so far never
> been an issue with that (people running into the trap of overwritten
> mutable elements).
>
> What do you think?
>
> Stephan
>


streaming GroupBy + Fold

2015-10-02 Thread Martin Neumann
Hej,

In one of my Programs I run a Fold on a GroupedDataStream. The aim is to
aggregate the values in each group.
It seems the aggregator in the Fold function is shared on operator level,
so all groups that end up on the same operator get mashed together.

Is this the wanted behavior? If so, what do I have to do to separate them?


cheers Martin


Re: streaming GroupBy + Fold

2015-10-02 Thread Martin Neumann
One of my colleagues found it today when we where hunting bugs today. We
where using the latest 0.10 version pulled from maven this morning.
The program we where testing is new code so I cant tell you if the behavior
has changed or if it was always like this.

On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:

> I think these operations were recently moved to the internal state
> interface. Did the behavior change then?
>
> @Marton or Gyula, can you comment? Is it per chance not mapped to the
> partitioned state?
>
> On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <mneum...@sics.se> wrote:
>
> > Hej,
> >
> > In one of my Programs I run a Fold on a GroupedDataStream. The aim is to
> > aggregate the values in each group.
> > It seems the aggregator in the Fold function is shared on operator level,
> > so all groups that end up on the same operator get mashed together.
> >
> > Is this the wanted behavior? If so, what do I have to do to separate
> them?
> >
> >
> > cheers Martin
> >
>


EventTime in streaming

2015-09-17 Thread Martin Neumann
After some work experience with the current solution I want to give some
feedback and maybe start a discussion about event time in streaming. This
is not about watermarks or any of the incoming improvements just some
observations from the current code.


*Starttime for EventTime:*

In the current implementation you can specify a start time if you don't it
defaults to 0.
The default is not feasible when using the typical milliseconds since 1970.
The *TimeTriggerPolicy* has the following implementation of
*preNotifyTrigger*:

@Override
> public synchronized Object[] preNotifyTrigger(DATA datapoint) {
> LinkedList fakeElements = new LinkedList();
> // check if there is more then one window border missed
> // use > here. In case >= would fit, the regular call will do the job.
> while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity)
> {
> startTime += granularity;
> fakeElements.add(startTime - 1);
> }
> return (Object[]) fakeElements.toArray();
> }


In practice this means using the default starttime will crash the program
(running our of memory) since it will create fake elements to close every
possible window since 1970.
So you need to set a starttime to make it run which is not that simple. In
production you could use the systemtime to initialize, but this might lead
to some problems when consuming events from e.g. Kafka with an older
timestamp. When debugging using old streams you need to know the lowest
timestamp of the stream to initialize.

What is the purpose of the fake elements? Is there a way to avoid the
memory problem of creating enormous amounts of empty windows?
Could we just use the timestamp of the first event processed as starttime
instead of having it as a parameter? I testing the following modification
of the above code at the moment, do you see any problem with that?

@Override
> public synchronized Object[] preNotifyTrigger(DATA datapoint) {
> LinkedList fakeElements = new LinkedList();
> // check if there is more then one window border missed
> // use > here. In case >= would fit, the regular call will do the job.
> // TODO modified here
> if(startTime == 0) startTime = timestampWrapper.getTimestamp(datapoint);
> while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity)
> {
> startTime += granularity;
> fakeElements.add(startTime - 1);
> }
> return (Object[]) fakeElements.toArray();
> }



*EventTime api confusion:*

I found several ways to use EventTime in my program but I find them not
very intuitive. Compare the two following lines of code both using the
Time.of helper one with event time and one with system time:

ds.window(Time.of(long windowSize, TimeUnit))
ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long
startTime))

Its weird that you cannot specify the TimeUnit when using the EventTimes
stamp. It would feel more natural if it would look like this (also without
the starttime):

ds.window(Time.of(long windowSize, TimeUnit, Timestamp
yourTimeStampExtractor))


At the moment I'm using the modified TimeTriggerPolicy direct leading to
this ugly piece of code:

.window(new TimeTriggerPolicyHack(10l, new
TimestampWrapper(new EventTimeStampExtractor(), 0l)), new
TimeEvictionPolicy(2, new TimestampWrapper(new
EventTimeStampExtractor(), 0l)))



cheers Martin


broadcast set sizes

2015-04-09 Thread Martin Neumann
Hej,

Up to what sizes are broadcast sets a good idea?

I have large dataset (~5 GB) and I'm only interested in lines with a
certain ID that I have in a file. The file has ~10 k entries.
I could either Join the dataset with the IDList or I could broadcast the ID
list and do the filtering in a Mapper.

What would be the better solution given the data sizes described above?
Is there a good rule of thumb when to switch from one solution to the other?

cheers Martin


gelli graph algorithm

2015-02-26 Thread Martin Neumann
Hej,

I was busy with other stuff for a while but I hope I will have more time to
work on Flink and Graphs again now.

I need to do some basic analytic's on a large graph set (stuff like degree
distribution, triangle count, component size distribution etc.)
Is there anything implemented in Gelli already that I can use? Is there
some basic documentation about Gelli somewhere?

If I need to program those algorithm myself I could do it in a way that we
can integrate them in Gelli. But I might need some help to make sure I
design them in a way that fits, so I might need some guidance/help.

cheers Martin


gelli graph algorithm

2015-02-26 Thread Martin Neumann
Hej,

I was busy with other stuff for a while but I hope I will have more time to
work on Flink and Graphs again now.

I need to do some basic analytic's on a large graph set (stuff like degree
distribution, triangle count, component size distribution etc.)
Is there anything implemented in Gelli already that I can use? Is there
some basic documentation about Gelli somewhere?

If I need to program those algorithm myself I could do it in a way that we
can integrate them in Gelli. But I might need some help to make sure I
design them in a way that fits, so I might need some guidance/help.

cheers Martin


Re: Stale Synchronous Parallel iterations in Flink

2015-02-25 Thread Martin Neumann
Hej,

Very interesting discussion.
I hadn't heard of the SSP model before, looks like something I want to look
into.
I wonder if any of the algorithms that would work in that model would not
work in an asynchronous model. Since asynchronous is basically a SSP model
with infinite slack. Iterative convergent algorithms such as connected
components would run in both environment the same.
Formulated differently: Are slack parameter arbitrary or is it defined by
the structure of the algorithm like it is the case in ABSP?

For the implementation issues, maybe we can learn something from Microsoft
Naiad http://research.microsoft.com/en-us/projects/naiad/ which has some
similarities to this.


cheers Martin

On Mon, Feb 23, 2015 at 4:55 PM, Nam-Luc Tran namluc.t...@euranova.eu
wrote:

 Hello guys,

 Thank you for your replies.

 (1) How to checkpoint such computations in order to recover them
 upon failures.
 Approximate snapshots are possible with SSP. In this case, a snapshot
 (solution set and or working set, what is exactly snapshotted can be
 discussed) is triggered once a clock=t message has been received
 from each thread. The snapshot will not be pure meaning that it
 contains strictly updates from 1...t, as some faster threads may be
 ahead of t and their results may already be incorporated by slower
 threads. However iterative convergent algorithms suitable for SSPs are
 able to tolerate this situation.

 (2) How to check for the termination of the computation without a
 sync barrier. Can the SSP model help with that?
 From what I have seen the SSP evaluates the convergence of the global
 model in order to terminate the iterations. In our case, I guess this
 would require to evaluate the convergence on the solution set and/or
 emptiness of the working set. Is it possible to interrupt Flink
 operators while they are running?

 We signal the end of a superstep by pushing a special message from
 the head of the loop to the tail. This can be thought of like a
 clock cycle. Currently, the new superstep starts on each
 parallel thread once all tails have received the message, thus
 forming the BSP barrier. For delta iterations, this tail is
 the next workset - the solution set completely independent of that.
 Could you point me out the classes in the code that realise this
 behaviour?

 I am very interested in digging deeper and implementing this in Flink.
 What would be the best point to start on from here according to you?

 Best regards,

 Tran Nam-Luc

 At Monday, 23/02/2015 on 10:32 Stephan Ewen wrote:

 Hey Tran Nam-Luc!

 Great post with some really cool thoughts.
 I just posted this answer to your LinkedIN post.

 Greetings,
 Stephan

 =

 Nice post, very cool idea! Your understanding of Flink in that respect
 is
 really good. I had not heard of SSP before,
 but it seems to be a good compromise between bulk synchronous and
 asynchronous iterations.

 Here are some comments and thought about how Apache Flink realizes
 iterations and how that mechanism can be extended
 to support SSP:

 - The loop in Flink is standing, operators are not re-created and
 re-deployed in every iteration.

 - We signal the end of a superstep by pushing a special message from
 the
 head of the loop to the tail. This can be thought of
like a clock cycle. Currently, the new superstep starts on each
 parallel
 thread once all tails have received the message,
thus forming the BSP barrier. For delta iterations, this tail
 is the
 next workset - the solution set completely independent of that.

 - We can probably interpret the end-of-superstep messages as clock
 messages. We could then allow threads to start their
next superstep if all tails have seen clock messages at least
 of its
 own clock time minus the slack.

 If you are looking to implement this in Flink, or dig deeper into
 this, let
 me know, I would be happy to help.

 On Fri, Feb 20, 2015 at 5:27 PM, Nam-Luc Tran
 wrote:

  Hello Everyone,
 
  I am Nam-Luc Tran, research Engineer at EURA NOVA [1]. Our research
  subjects cover distributed machine learning and we have been working
  on dataflow graph processing for a while now. We have been reading
  from you since Stratosphere :-)
 
  Our current research focuses on Stale Synchronous Parallelism and we
  are currently considering Apache Flink as a good candidate for
  implementing and delivering the best results among the existing
  processing solutions. I have written a post about it here:
 
 

 https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran
 
 
  What do you guys think about the approach? Does it seem feasible, or
  do you have anything similar in your roadmap?
 
  Best regards,
 
  Tran Nam-Luc
 
 
 
  Links:
  --
  [1] http://euranova.eu