Re: Flink streaming job with iterations gets stuck waiting for network buffers

2017-04-03 Thread Gábor Hermann

Hi Andrey,

As Paris has explained it, this is a known issue and there are ongoing 
efforts to solve it.


I can suggest a workaround: limit the amount of messages sent into the 
iteration manually. You can do this with a e.g. a Map operator that 
limits records per seconds and simply sends what it has received. You 
can check at every incoming record whether the limit has been reached, 
and if so Thread.sleep until the next second. You could place Map 
operator before the operator that ingests data into the iteration 
(operator with ID 9 in your dataflow graph). This way you can avoid 
overloading the network inside the iteration, and thus avoid deadlock 
caused by backpressure.


This approach is, of course, a bit hacky. Also, it does not eliminate 
the possibility of a deadlock entirely. Other disadvantage is that you 
have to manually tune the rate of ingesting. That could depend on lot of 
things: the data load, the number of operator instances, the placement 
of operator instances, etc. But I have used something like this as a 
temporary workaround until we see more progress with FLIP-15.


Cheers,
Gabor


On 2017-04-03 13:33, Paris Carbone wrote:

Hi Andrey,

If I am not mistaken this sounds like a known deadlock case and can be caused 
by the combination of Flink's backpressure mechanism with iterations (more 
likely when there is heavy feedback load).
Keep in mind that, currently, iterations are (perhaps the only) not stable 
feature to use. The good news is that there is a complete redesign planned for 
it (partly FLIP-15 [1]) that has to entirely address this pending flow control 
issue as well.

Increasing network buffers or feedback queue capacity to a really high number 
decreases the possibility of the deadlock but does not eliminate it.
I really cannot think of a quick solution to the problem that does not involve 
some deep changes.

I am CCing dev since this seems like a very relevant use case to revive the 
discussion for the loops redesign and also keep you in the loop (no pun 
intended) regarding this specific issue.
Will also update FLIP-15 with several interesting proposals under discussion 
from Stephan to tackle this issue.

cheers,
Paris

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination


On 3 Apr 2017, at 12:54, Andrey Melentyev 
> wrote:

Hi,

I have a Flink 1.2.0 streaming job using a number of stateful operators and an 
iteration loop with a RichFlatMapFunction inside. On the high level, the app 
reads some data, massages it and feeds into an iterative algorithm which 
produces some output and feedback while keeping the state. All stateful 
operators are on KeyedStreams. Input is some data on file system and output is 
stdout.

The implementation passes functional tests but when tested with noticeable 
amounts of input data (tens of thousands records, dozens of MB raw data) after 
a few seconds of good throughput, backpressure kicks in and the application 
essentially gets stuck: most of the threads are blocked waiting for buffers, 
occasional message gets processed every few minutes. There's nothing strange in 
the log files.

The behaviour is reproducible both in local execution environment and in Flink 
standalone cluster (started using jobmanager.sh and taskmanager.sh)

The problematic part is likely in the iterations since the part of the job 
before iterations works fine with the same data.

I would appreciate pointers as to how to debug this. 
taskmanager.network.numberOfBuffers from the config sounds relevant but the 
default value of 2048 is already much higher than slots-per-TM^2 * #TMs * 4 = 
4^2 * 1 * 4 = 64.

Attaching flink config, job execution plan and thread dump with some sensitive 
parts retracted.

flink-conf.yml

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512
taskmanager.heap.mb: 8192
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 4
jobmanager.web.port: 8081
state.backend: rocksdb
state.backend.fs.checkpointdir: 
file:///Users/andrey.melentyev/tmp/flink-checkpoints

Job execution plan

{
   "nodes": [
 {
   "contents": "IterationSource-10",
   "id": -1,
   "pact": "Data Source",
   "parallelism": 8,
   "type": "IterationSource-10"
 },
 {
   "contents": "Source: Custom File Source",
   "id": 1,
   "pact": "Data Source",
   "parallelism": 1,
   "type": "Source: Custom File Source"
 },
 {
   "contents": "Split Reader: Custom File Source",
   "id": 2,
   "pact": "Operator",
   "parallelism": 8,
   "predecessors": [
 {
   "id": 1,
   "ship_strategy": "REBALANCE",
   "side": "second"
 }
   ],
   "type": "Split Reader: Custom File Source"
 },
 {
   "contents": "Parse JSON",
   "id": 3,
   "pact": "Operator",
   

Re: Machine Learning on Flink - Next steps

2017-03-20 Thread Gábor Hermann

Hi all,

@Theodore:
+1 for the CTR use-case. Thanks for the suggestion!

@Katherin:
+1 for reflecting the choices made here and contributor commitment in Gdoc.

@Tao, @Ventura:
It's great to here you have been working on ML on Flink :)
I hope we can all aggregate our efforts somehow. It would be best if you 
could contribute some of your work.



I've started putting together a Gdoc specifically for 
*Offline/incremental learning on Streaming API*:

https://docs.google.com/document/d/18BqoFTQ0dPkbyO-PWBMMpW5Nl0pjobSubnWpW0_r8yA/
Right now you can comment/give suggestions there. I'd like to start a 
separate mailing list discussion as soon as there are enough 
contributors volunteering for this direction. For now, I'm trying to 
reflect the relevant part of the discussion here and the initial Gdoc [1].



[1] 
https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc49h3Ud06MIRhahtJ6dw/


Cheers,
Gabor

On 2017-03-20 14:27, Ventura Del Monte wrote:


Hello everyone,

Here at DFKI, we are currently working on project that involves developing
open-source Online Machine Learning algorithms on top of Flink.
So far, we have simple moments, sampling (e.g.: simple reservoir sampling)
and sketches (e.g., Frequent Directions) built on top of scikit-like
abstractions and Flink's DataStream/KeyedStream.
Moreover, we have few industrial use cases and we are gonna validate our
implementation using real industrial data.
We plan to implement more advanced algorithms in the future as well as to
share our results with you and contribute, in case you are interested.

Best,
Ventura




This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
confidential and/or privileged information. If you are not the addressee or
authorized to receive this for the addressee, you must not use, copy,
disclose or take any action based on this message or any information
herein. If you have received this message in error, please advise the
sender immediately by reply e-mail and delete this message. Thank you for
your cooperation.

On Mon, Mar 20, 2017 at 12:26 PM, Tao Meng <oatg...@gmail.com> wrote:


Hi All,

Sorry for joining this discussion late.
My graduation thesis is about online learning system. I would build it on
flink in the next three months.

I'd like to contribute on:
  - Online learning




On Mon, Mar 20, 2017 at 6:51 PM Katherin Eri <katherinm...@gmail.com>
wrote:

Hello, Theodore

Could you please move vectors of development and their prioritized
positions from *## Executive summary* to the google doc?



Could you please also create some table in google doc, that is representing
the selected directions and persons, who would like to drive or participate
in the particular topic, in order to make this process transparent for
community and sum up current state of commitment of contributors?

There we could simply inscribe us to some topic.



And 1+ for CTR prediction case.

вс, 19 мар. 2017 г. в 16:49, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com>:


Hello Stavros,

The way I thought we'd do it is that each shepherd would be responsible

for

organizing the project: that includes setting up a Google doc, sending an
email to the dev list to inform the wider community, and if possible,
personally contacting the people who expressed interest in the project.

Would you be willing to lead that effort for the model serving project?

Regards,
Theodore

--
Sent from a mobile device. May contain autocorrect errors.

On Mar 19, 2017 3:49 AM, "Stavros Kontopoulos" <st.kontopou...@gmail.com

wrote:


Hi all...

I agree about the tensorflow integration it seems to be important from

what

I hear.
Should we sign up somewhere for the working groups (gdcos)?
I would like to start helping with the model serving feature.

Best Regards,
Stavros

On Fri, Mar 17, 2017 at 10:34 PM, Gábor Hermann <m...@gaborhermann.com
wrote:


Hi Chen,

Thanks for the input! :)

There is already a project [1] for using TensorFlow models in Flink,

and

Theodore has suggested
to contact the author, Eron Wright for the model serving direction.


[1] http://sf.flink-forward.org/kb_sessions/introducing-flink-

tensorflow/

Cheers,
Gabor


On 2017-03-17 19:41, Chen Qin wrote:


[1]http://sf.flink-forward.org/kb_sessions/introducing-flink-te
nsorflow/




--

*Yours faithfully, *

*Kate Eri.*





Re: Machine Learning on Flink - Next steps

2017-03-17 Thread Gábor Hermann

Hi Chen,

Thanks for the input! :)

There is already a project [1] for using TensorFlow models in Flink, and 
Theodore has suggested

to contact the author, Eron Wright for the model serving direction.


[1] http://sf.flink-forward.org/kb_sessions/introducing-flink-tensorflow/

Cheers,
Gabor


On 2017-03-17 19:41, Chen Qin wrote:

[1]http://sf.flink-forward.org/kb_sessions/introducing-flink-te
nsorflow/




Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-17 Thread Gábor Hermann
Thanks for demonstrating the windowed side-input case. I completely 
agree that handling windowed side-input separately would just simply 
complicate the implementation. The triggering mechanism for the upstream 
window could define when the windowed input is ready.


I would gladly contribute to a low-level requirement. If there's a 
somewhat well defined JIRA issue, I'm happy to start working on it.


Cheers,
Gabor


On 2017-03-17 16:03, Aljoscha Krettek wrote:

Yes, I agree! The implementation stuff we talked about so far is only
visible at the operator level. A user function that uses the (future)
side API would not be aware of whether buffering or blocking is used. It
would simply know that it is invoked and that side input is ready.

I'll also quickly try to elaborate on my comment about why I think
windowing/triggering in the side-input operator itself is not necessary.
I created a figure: http://imgur.com/a/aAlw7 It is enough for the
side-input operator simply to consider side input for a given window as
ready when we have seen some data for that window. The WindowOperator
that is upstream of the side input will take care of
windowing/triggering.

I'll create Jira issues for implementing the low-level requirements for
side inputs (n-ary operator, broadcast state and buffering) and update
this thread. If anyone is interested on working on one of those we might
have a chance of getting this ready for Flink 1.3. Time is a bit tight
for me because I'm going to be on vacation for 1.5 weeks starting next
week Wednesday and after that we have Flink Forward.

Best,
Aljoscha

On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:

Regarding the CoFlatMap workaround,
- For keyed streams, do you suggest that having a per-key buffer stored
as keyed state would have a large memory overhead? That must be true,
although a workaround could be partitioning the data and using a
non-keyed stream. Of course that seems hacky, as we have a keyed stream
abstraction, so I agree with you.
- I agree that keeping a broadcast side-input in the operator state is
not optimal. That's a good point I have not thought about. First we have
a separate abstraction for broadcast state, then we can optimize e.g.
checkpointing it (avoiding checkpointing it at every operator).


Regarding blocking/backpressuring inputs, it should not only be useful
for static side-input, but also for periodically updated (i.e. slowly
changing). E.g. when a machine learning model is updated and loaded
every hour, it make sense to prioritize loading the model on the side
input. But I see the limitations of the underlying runtime.

Exposing a buffer could be useful for now. Although, the *API* for
blocking could even be implemented by simply buffering. So the buffering
could be hidden from the user, and later maybe optimized to not only
buffer, but also apply backpressure. What do you think? Again, for the
prototype, exposing the buffer should be fine IMHO. API and
implementation for blocking inputs could be a separate issue, but let's
not forget about it.

Cheers,
Gabor


On 2017-03-15 16:14, Aljoscha Krettek wrote:

Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:

Hi, Aljoscha, I just go through your prototype. I like the design of the
SideInputReader which can make it flexible to determine when we can get
the
side input.

I agree that side inputs are API sugar on the top of the three
components(n-ary
inputs, broadcast state and input buffering), following is some more
thought about the three component:

1. Take both N-ary input operator and windowing/triggers mechanism into
consideration, I thi

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-16 Thread Gábor Hermann

Regarding the CoFlatMap workaround,
- For keyed streams, do you suggest that having a per-key buffer stored 
as keyed state would have a large memory overhead? That must be true, 
although a workaround could be partitioning the data and using a 
non-keyed stream. Of course that seems hacky, as we have a keyed stream 
abstraction, so I agree with you.
- I agree that keeping a broadcast side-input in the operator state is 
not optimal. That's a good point I have not thought about. First we have 
a separate abstraction for broadcast state, then we can optimize e.g. 
checkpointing it (avoiding checkpointing it at every operator).



Regarding blocking/backpressuring inputs, it should not only be useful 
for static side-input, but also for periodically updated (i.e. slowly 
changing). E.g. when a machine learning model is updated and loaded 
every hour, it make sense to prioritize loading the model on the side 
input. But I see the limitations of the underlying runtime.


Exposing a buffer could be useful for now. Although, the *API* for 
blocking could even be implemented by simply buffering. So the buffering 
could be hidden from the user, and later maybe optimized to not only 
buffer, but also apply backpressure. What do you think? Again, for the 
prototype, exposing the buffer should be fine IMHO. API and 
implementation for blocking inputs could be a separate issue, but let's 
not forget about it.


Cheers,
Gabor


On 2017-03-15 16:14, Aljoscha Krettek wrote:

Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:

Hi, Aljoscha, I just go through your prototype. I like the design of the
SideInputReader which can make it flexible to determine when we can get
the
side input.

I agree that side inputs are API sugar on the top of the three
components(n-ary
inputs, broadcast state and input buffering), following is some more
thought about the three component:

1. Take both N-ary input operator and windowing/triggers mechanism into
consideration, I think we may need the N-ary input operator supports some
inputs(side inputs) are windowed while the others(main input) are normal
stream. for static/slow-evolving data, we need to use global windows and
for windowed-base join data , we need to use time window or custom
windows.
The window function on the side input can be used to collect or merge the
data to generate the value of the side input(a single value or
list/map).
Once a side input reader window is triggered, the SideInputReader will
return value available, and if a Window is triggered more than once, the
value of side input will be updated and maybe the SideInputReader need a
interface to notice the user that something changed. Besides, I prefer
the
option to make every input of N-ary input operator equal, because user
may
need one side input depends on another side input.

2. Regarding broadcast state, my concern is that how can we merge the
value
of the state from different subtasks. If the job running in at least once
mode, the returned value of broadcast state from different subtasks will
be
different. Is there already any design on broadcast state?

3. Regarding input buffering, I think if we use window/trigger mechanism,
state can be store in the state of window, which may be mostly like what
we
need to do currently in KeyedWindow and AllWindow. We may need to allow
custom merge strategy on all window state data since in side inputs we
may
need to choose data according to broadcast state strategy  while in
normal
windows we can just redistribute the window state data.

What do you think?

Best Regards!

Wenlong

On 14 March 2017 at 01:41, Aljoscha Krettek 

Re: Machine Learning on Flink - Next steps

2017-03-16 Thread Gábor Hermann
 we could definitely help with
that.

Regards,
Stavros



On Fri, Mar 10, 2017 at 4:08 PM, Till Rohrmann <trohrm...@apache.org>
wrote:


Thanks Theo for steering Flink's ML effort here :-)

I'd vote to concentrate on

- Online learning
- Low-latency prediction serving

because of the following reasons:

Online learning:

I agree that this topic is highly researchy and it's not even clear

whether

it will ever be of any interest outside of academia. However, it was

the

same for other things as well. Adoption in industry is usually slow and
sometimes one has to dare to explore something new.

Low-latency prediction serving:

Flink with its streaming engine seems to be the natural fit for such a

task

and it is a rather low hanging fruit. Furthermore, I think that users

would

directly benefit from such a feature.

Offline learning with Streaming API:

I'm not fully convinced yet that the streaming API is powerful enough
(mainly due to lack of proper iteration support and spilling

capabilities)

to support a wide range of offline ML algorithms. And if then it will

only

support rather small problem sizes because streaming cannot gracefully
spill the data to disk. There are still to many open issues with the
streaming API to be applicable for this use case imo.

Offline learning with the batch API:

For offline learning the batch API is imo still better suited than the
streaming API. I think it will only make sense to port the algorithms

to

the streaming API once batch and streaming are properly unified. Alone

the

highly efficient implementations for joining and sorting of data which

can

go out of memory are important to support big sized ML problems. In
general, I think it might make sense to offer a basic set of ML

primitives.

However, already offering this basic set is a considerable amount of

work.

Concering the independent organization for the development: I think it
would be great if the development could still happen under the umbrella

of

Flink's ML library because otherwise we might risk some kind of
fragmentation. In order for people to collaborate, one can also open

PRs

against a branch of a forked repo.

I'm currently working on wrapping the project re-organization

discussion

up. The general position was that it would be best to have an

incremental

build and keep everything in the same repo. If this is not possible

then

we

want to look into creating a sub repository for the libraries (maybe

other

components will follow later). I hope to make some progress on this

front

in the next couple of days/week. I'll keep you updated.

As a general remark for the discussions on the google doc. I think it

would

be great if we could at least mirror the discussions happening in the
google doc back on the mailing list or ideally conduct the discussions
directly on the mailing list. That's at least what the ASF encourages.

Cheers,
Till

On Fri, Mar 10, 2017 at 10:52 AM, Gábor Hermann <m...@gaborhermann.com
wrote:


Hey all,

Sorry for the bit late response.

I'd like to work on
- Offline learning with Streaming API
- Low-latency prediction serving

I would drop the batch API ML because of past experience with lack of
support, and online learning because the lack of use-cases.

I completely agree with Kate that offline learning should be

supported,

but given Flink's resources I prefer using the streaming API as

Roberto

suggested. Also, full model lifecycle (or end-to-end ML) could be

more

easily supported in one system (one API). Connecting Flink Batch with

Flink

Streaming is currently cumbersome (although side inputs [1] might

help).

In

my opinion, a crucial part of end-to-end ML is low-latency

predictions.

As another direction, we could integrate Flink Streaming API with

other

projects (such as Prediction IO). However, I believe it's better to

first

evaluate the capabilities and drawbacks of the streaming API with

some

prototype of using Flink Streaming for some ML task. Otherwise we

could

run

into critical issues just as the System ML integration with e.g.

caching.

These issues makes the integration of Batch API with other ML

projects

practically infeasible.

I've already been experimenting with offline learning with the

Streaming

API. Hopefully, I can share some initial performance results next

week

on

matrix factorization. Naturally, I've run into issues. E.g. I could

only

mark the end of input with some hacks, because this is not needed at

a

streaming job consuming input forever. AFAIK, this would be resolved

by

side inputs [1].

@Theodore:
+1 for doing the prototype project(s) separately the main Flink
repository. Although, I would strongly suggest to follow Flink

development

guidelines as closely as possible. As another note, there is already

a

GitHub organization for Flink related projects [2], but it seems like

it

has not been used much.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+
Side+Inputs+for+DataStre

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-10 Thread Gábor Hermann

Hi all,

Thanks Aljoscha for going forward with the side inputs and for the nice 
proposal!


I'm also in favor of the implementation with N-ary input (3.) for the 
reasons Ventura explained. I'm strongly against managing side inputs at 
StreamTask level (2.), as it would create another abstraction for almost 
the same purposes as a TwoInputOperator. Making use of the second input 
of a 2-input operator (1.) could be useful for prototyping. I assume it 
would be easier to implement a minimal solution with that, but I'm not 
sure. If the N-ary input prototype is almost ready, then it's best to go 
with that.


For side input readiness, it would be better to wait for the side input 
to be completely ready. As Gyula has suggested, waiting only for the 
first record does not differ much from not waiting at all. I would also 
prefer user-defined readiness, but for the minimal solution we could fix 
this for completely read side input and maybe go only for static side 
inputs first.


I understand that we should push a minimal viable solution forward. The 
current API and implementation proposal seems like a good start. 
However, long term goals are also important, to avoid going in a wrong 
direction. As I have not participated in the discussion let me share 
also some longer term considerations in reply to the others. (Sorry for 
the length.)



How would side inputs help the users? For the simple, non-windowed cases 
with static input a CoFlatMap might be sufficient. The main input can be 
buffered while the side input is consumed and stored in the operator 
state. Thus, the user can decide inside the CoFlatMap UDF when to start 
consuming the stream input (e.g. when the side input is ready). Of 
course, this might be problematic to implement, so the side inputs API 
could help the user with this pattern.


1)
First, marking the end of side input is not easy. Every side input 
should broadcast some kind of EOF to the consuming operator. If we 
generalize to non-static (slowly changing) inputs, then progress 
tracking messages should be broadcast periodically. This is reminiscent 
of the watermark time tracking for windows.


I agree with Gyula that we should have user defined side input 
readiness. Although, couldn't we use windowing for this? It's not worth 
having two separate time tracking mechanisms (one for windows, one for 
side inputs). If the windowing is not flexible enough to handle such 
cases, then what about exposing watermark tracking to the user? E.g. we 
could have an extra user defined event handler in RichFunctions when 
time progress is made. This generalizes the two progress tracking. Of 
course, this approach requires more work so it's not for the minimal 
viable solution.


2)
Second, exposing a buffer to the user helps a bit, but the users could 
buffer the data simply in an operator state. How would a buffer help 
more? Of course, the interface could have multiple implementations, such 
as a spilling buffer, and the user could choose. That helps the "waiting 
pattern".


I agree with Wenlong's suggestion that a blocking (or backpressure) must 
be an option. It seems crucial to avoid consuming a large part of the 
main input, that would take a lot of space. I suggest not to expose a 
buffer, but to allow the users to control whether to read from the 
different inputs. E.g. in the N-ary input operator UDF the user could 
control this per input: startConsuming(), stopConsuming(). Then it's the 
user's responsibility not to get into deadlocks, but the runtime handles 
the buffering. For reading static side input, the user could stop 
consuming the main input until she considers the side input ready.


User controlled backpressure would also benefit avoiding deadlock in 
stream loops.


3)
I also agree with Wenlong's 2. point, that checkpointing should be 
considered, but I don't think it's really important for the prototype. 
If we maintain the side input in the state of the consuming operator 
then the checkpoint would not stop once the static side input is 
finished, because the main input goes on, the operator stays running. 
Incremental checkpointing could prevent checkpointing static data at 
every checkpoint.



Cheers,
Gabor

On 2017-03-09 16:59, Aljoscha Krettek wrote:


Hi Jamie,
actually the approach where the .withSideInput() comes before the user
function is only required for implementation proposal #1, which I like
the least. For the other two it can be after the user function, which is
also what I prefer.

Regarding semantics: yes, we simply wait for anything to be available.
For GlobalWindows, i.e. side inputs on a normal function where we simply
don't have windows, this means that we wait for anything. For the
windowed case, which I'm proposing as a second step we will wait for
side input in a window to be available that matches the main-input
window. For the keyed case we wait for something on the same key to be
available, for the broadcast case we wait for anything.


Re: Machine Learning on Flink - Next steps

2017-03-10 Thread Gábor Hermann

Hey all,

Sorry for the bit late response.

I'd like to work on
- Offline learning with Streaming API
- Low-latency prediction serving

I would drop the batch API ML because of past experience with lack of 
support, and online learning because the lack of use-cases.


I completely agree with Kate that offline learning should be supported, 
but given Flink's resources I prefer using the streaming API as Roberto 
suggested. Also, full model lifecycle (or end-to-end ML) could be more 
easily supported in one system (one API). Connecting Flink Batch with 
Flink Streaming is currently cumbersome (although side inputs [1] might 
help). In my opinion, a crucial part of end-to-end ML is low-latency 
predictions.


As another direction, we could integrate Flink Streaming API with other 
projects (such as Prediction IO). However, I believe it's better to 
first evaluate the capabilities and drawbacks of the streaming API with 
some prototype of using Flink Streaming for some ML task. Otherwise we 
could run into critical issues just as the System ML integration with 
e.g. caching. These issues makes the integration of Batch API with other 
ML projects practically infeasible.


I've already been experimenting with offline learning with the Streaming 
API. Hopefully, I can share some initial performance results next week 
on matrix factorization. Naturally, I've run into issues. E.g. I could 
only mark the end of input with some hacks, because this is not needed 
at a streaming job consuming input forever. AFAIK, this would be 
resolved by side inputs [1].


@Theodore:
+1 for doing the prototype project(s) separately the main Flink 
repository. Although, I would strongly suggest to follow Flink 
development guidelines as closely as possible. As another note, there is 
already a GitHub organization for Flink related projects [2], but it 
seems like it has not been used much.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

[2] https://github.com/project-flink

On 2017-03-04 08:44, Roberto Bentivoglio wrote:


Hi All,

I'd like to start working on:
  - Offline learning with Streaming API
  - Online learning

I think also that using a new organisation on github, as Theodore propsed,
to keep an initial indipendency to speed up the prototyping and development
phases it's really interesting.

I totally agree with Katherin, we need offline learning, but my opinion is
that it will be more straightforward to fix the streaming issues than batch
issues because we will have more support on that by the Flink community.

Thanks and have a nice weekend,
Roberto

On 3 March 2017 at 20:20, amir bahmanyari 
wrote:


Great points to start:- Online learning
   - Offline learning with the streaming API

Thanks + have a great weekend.

   From: Katherin Eri 
  To: dev@flink.apache.org
  Sent: Friday, March 3, 2017 7:41 AM
  Subject: Re: Machine Learning on Flink - Next steps

Thank you, Theodore.

Shortly speaking I vote for:
1) Online learning
2) Low-latency prediction serving -> Offline learning with the batch API

In details:
1) If streaming is strong side of Flink lets use it, and try to support
some online learning or light weight inmemory learning algorithms. Try to
build pipeline for them.

2) I think that Flink should be part of production ecosystem, and if now
productions require ML support, multiple models deployment and so on, we
should serve this. But in my opinion we shouldn’t compete with such
projects like PredictionIO, but serve them, to be an execution core. But
that means a lot:

a. Offline training should be supported, because typically most of ML algs
are for offline training.
b. Model lifecycle should be supported:
ETL+transformation+training+scoring+exploitation quality monitoring

I understand that batch world is full of competitors, but for me that
doesn’t mean that batch should be ignored. I think that separated
streaming/batching applications causes additional deployment and
exploitation overhead which typically tried to be avoided. That means that
we should attract community to this problem in my opinion.


пт, 3 мар. 2017 г. в 15:34, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com>:

Hello all,

 From our previous discussion started by Stavros, we decided to start a
planning document [1]
to figure out possible next steps for ML on Flink.

Our concerns where mainly ensuring active development while satisfying the
needs of
the community.

We have listed a number of proposals for future work in the document. In
short they are:

   - Offline learning with the batch API
   - Online learning
   - Offline learning with the streaming API
   - Low-latency prediction serving

I saw there is a number of people willing to work on ML for Flink, but the
truth is that we cannot
cover all of these suggestions without fragmenting the development too
much.

So my recommendation is to pick out 2 of these options, 

Re: [DISCUSS] Flink ML roadmap

2017-02-28 Thread Gábor Hermann

Hi Philipp,

It's great to hear you are interested in Flink ML!

Based on your description, your prototype seems like an interesting 
approach for combining online+offline learning. If you're interested, we 
might find a way to integrate your work, or at least your ideas, into 
Flink ML if we decide on a direction that fits your approach. I think 
your work could be relevant for almost all the directions listed there 
(if I understand correctly you'd even like to serve predictions on 
unlabeled data).


Feel free to join the discussion in the docs you've mentioned :)

Cheers,
Gabor

On 2017-02-27 18:39, Philipp Zehnder wrote:


Hello all,

I’m new to this mailing list and I wanted to introduce myself. My name is 
Philipp Zehnder and I’m a Masters Student in Computer Science at the Karlsruhe 
Institute of Technology in Germany currently writing on my master’s thesis with 
the main goal to integrate reusable machine learning components into a stream 
processing network. One part of my thesis is to create an API for distributed 
online machine learning.

I saw that there are some recent discussions how to continue the development of 
Flink ML [1] and I want to share some of my experiences and maybe get some 
feedback from the community for my ideas.

As I am new to open source projects I hope this is the right place for this.

In the beginning, I had a look at different already existing frameworks like 
Apache SAMOA for example, which is great and has a lot of useful resources. 
However, as Flink is currently focusing on streaming, from my point of view it 
makes sense to also have a streaming machine learning API as part of the Flink 
ecosystem.

I’m currently working on building a prototype for a distributed streaming 
machine learning library based on Flink that can be used for online and 
“classical” offline learning.

The machine learning algorithm takes labeled and non-labeled data. On a labeled 
data point first a prediction is performed and then this label is used to train 
the model. On a non-labeled data point just a prediction is performed. The main 
difference between the online and offline algorithms is that in the offline 
case the labeled data must be handed to the model before the unlabeled data. In 
the online case, it is still possible to process labeled data at a later point 
to update the model. The advantage of this approach is that batch algorithms 
can be applied on streaming data as well as online algorithms can be supported.

One difference to batch learning are the transformers that are used to 
preprocess the data. For example, a simple mean subtraction must be implemented 
with a rolling mean, because we can’t calculate the mean over all the data, but 
the Flink Streaming API is perfect for that. It would be useful for users to 
have an extensible toolbox of transformers.

Another difference is the evaluation of the models. As we don’t have a single 
value to determine the model quality, in streaming scenarios this value evolves 
over time when it sees more labeled data.

However, the transformation and evaluation works again similar in both online 
learning and offline learning.

I also liked the discussion in [2] and I think that the competition in the 
batch learning field is hard and there are already a lot of great projects. I 
think it is true that in most real world problems it is not necessary to update 
the model immediately, but there are a lot of use cases for machine learning on 
streams. For them it would be nice to have a native approach.

A stream machine learning API for Flink would fit very well and I would also be 
willing to contribute to the future development of the Flink ML library.



Best regards,

Philipp

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-ML-roadmap-td16040.html
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-ML-roadmap-td16040.html>
[2] 
https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc49h3Ud06MIRhahtJ6dw/edit#heading=h.v9v1aj3xosv2
 
<https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc49h3Ud06MIRhahtJ6dw/edit#heading=h.v9v1aj3xosv2>



Am 23.02.2017 um 15:48 schrieb Gábor Hermann <m...@gaborhermann.com>:

Okay, I've created a skeleton of the design doc for choosing a direction:
https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc49h3Ud06MIRhahtJ6dw/edit?usp=sharing

Much of the pros/cons have already been discussed here, so I'll try to put 
there all the arguments mentioned in this thread. Feel free to put there more :)

@Stavros: I agree we should take action fast. What about collecting our 
thoughts in the doc by around Tuesday next week (28. February)? Then decide on 
the direction and design a roadmap by around Friday (3. March)? Is that 
feasible, or should it take more time?

I think it will be necessary to have a shepherd, or even better a committer, to 
be involved in at least reviewing and accepting the roadmap

Re: [DISCUSS] Flink ML roadmap

2017-02-23 Thread Gábor Hermann

@Theodore, thanks for taking lead in the coordination :)

Let's see what we can do, and then decide what should start out as an 
independent project, or strictly inside Flink.
I agree that something experimental like batch ML on streaming would 
probably benefit more an independent repo first.


On 2017-02-23 16:56, Theodore Vasiloudis wrote:


Sure having a deadline for March 3rd is fine. I can act as coordinator,
trying to guide the discussion to concrete results.

For committers it's up to their discretion and time if one wants to
participate. I don't think it's necessary to have one, but it would be most
welcome.

@Katherin I would suggest you start a topic on the list about FLINK-1730,
if it takes a lot of development effort from your side it's best to at
least try to gauge the community's interest, and whether there will be
motivation to merge the changes.

Maybe at the end of this we have a FLIP we can submit, that's probably the
way forward if we want to keep this effort within the project. For a new,
highly experimental project like batch ML on streaming I would actually
favor developing on an independent repo, which can later be merged into
main if there is interest.

Regards.
Theodore

On Thu, Feb 23, 2017 at 4:41 PM, Gábor Hermann <m...@gaborhermann.com>
wrote:


Okay, let's just aim for around the end of next week, but we can take more
time to discuss if there's still a lot of ongoing activity. Keep the topic
hot!

Thanks all for the enthusiasm :)



On 2017-02-23 16:17, Stavros Kontopoulos wrote:


@Gabor 3rd March is ok for me. But maybe giving a bit more time to it like
a week may suit more people.
What do you think all?
I will contribute to the doc.

+100 for having a co-ordinator + commiter.

Thank you all for joining the discussion.

Cheers,
Stavros

On Thu, Feb 23, 2017 at 4:48 PM, Gábor Hermann <m...@gaborhermann.com>
wrote:

Okay, I've created a skeleton of the design doc for choosing a direction:

https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc
49h3Ud06MIRhahtJ6dw/edit?usp=sharing

Much of the pros/cons have already been discussed here, so I'll try to
put
there all the arguments mentioned in this thread. Feel free to put there
more :)

@Stavros: I agree we should take action fast. What about collecting our
thoughts in the doc by around Tuesday next week (28. February)? Then
decide
on the direction and design a roadmap by around Friday (3. March)? Is
that
feasible, or should it take more time?

I think it will be necessary to have a shepherd, or even better a
committer, to be involved in at least reviewing and accepting the
roadmap.
It would be best, if a committer coordinated all this.
@Theodore: Would you like to do the coordination?

Regarding the use-cases: I've seen some abstracts of talks at SF Flink
Forward [1] that seem promising. There are companies already using Flink
for ML [2,3,4,5].

[1] http://sf.flink-forward.org/program/sessions/
[2] http://sf.flink-forward.org/kb_sessions/experiences-with-str
eaming-vs-micro-batch-for-online-learning/
[3] http://sf.flink-forward.org/kb_sessions/introducing-flink-te
nsorflow/
[4] http://sf.flink-forward.org/kb_sessions/non-flink-machine-le
arning-on-flink/
[5] http://sf.flink-forward.org/kb_sessions/streaming-deep-learn
ing-scenarios-with-flink/

Cheers,
Gabor



On 2017-02-23 15:19, Katherin Eri wrote:

I have asked already some teams for useful cases, but all of them need

time
to think.
During analysis something will finally arise.
May be we can ask partners of Flink  for cases? Data Artisans got
results
of customers survey: [1], ML better support is wanted, so we could ask
what
exactly is necessary.

[1] http://data-artisans.com/flink-user-survey-2016-part-2/

23 февр. 2017 г. 4:32 PM пользователь "Stavros Kontopoulos" <
st.kontopou...@gmail.com> написал:

+100 for a design doc.


Could we also set a roadmap after some time-boxed investigation
captured
in
that document? We need action.

Looking forward to work on this (whatever that might be) ;) Also are
there
any data supporting one direction or the other from a customer
perspective?
It would help to make more informed decisions.

On Thu, Feb 23, 2017 at 2:23 PM, Katherin Eri <katherinm...@gmail.com>
wrote:

Yes, ok.


let's start some design document, and write down there already
mentioned
ideas about: parameter server, about clipper and others. Would be nice
if
we will also map this approaches to cases.
Will work on it collaboratively on each topic, may be finally we will

form

some picture, that could be agreed with committers.

@Gabor, could you please start such shared doc, as you have already

several

ideas proposed?

чт, 23 февр. 2017, 15:06 Gábor Hermann <m...@gaborhermann.com>:

I agree, that it's better to go in one direction first, but I think


online and offline with streaming API can go somewhat parallel later.

We

could set a short-term goal, concentrate initially on one direction,
and
showcase that direc

Re: [DISCUSS] Flink ML roadmap

2017-02-23 Thread Gábor Hermann
Okay, let's just aim for around the end of next week, but we can take 
more time to discuss if there's still a lot of ongoing activity. Keep 
the topic hot!


Thanks all for the enthusiasm :)


On 2017-02-23 16:17, Stavros Kontopoulos wrote:

@Gabor 3rd March is ok for me. But maybe giving a bit more time to it like
a week may suit more people.
What do you think all?
I will contribute to the doc.

+100 for having a co-ordinator + commiter.

Thank you all for joining the discussion.

Cheers,
Stavros

On Thu, Feb 23, 2017 at 4:48 PM, Gábor Hermann <m...@gaborhermann.com>
wrote:


Okay, I've created a skeleton of the design doc for choosing a direction:
https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc
49h3Ud06MIRhahtJ6dw/edit?usp=sharing

Much of the pros/cons have already been discussed here, so I'll try to put
there all the arguments mentioned in this thread. Feel free to put there
more :)

@Stavros: I agree we should take action fast. What about collecting our
thoughts in the doc by around Tuesday next week (28. February)? Then decide
on the direction and design a roadmap by around Friday (3. March)? Is that
feasible, or should it take more time?

I think it will be necessary to have a shepherd, or even better a
committer, to be involved in at least reviewing and accepting the roadmap.
It would be best, if a committer coordinated all this.
@Theodore: Would you like to do the coordination?

Regarding the use-cases: I've seen some abstracts of talks at SF Flink
Forward [1] that seem promising. There are companies already using Flink
for ML [2,3,4,5].

[1] http://sf.flink-forward.org/program/sessions/
[2] http://sf.flink-forward.org/kb_sessions/experiences-with-str
eaming-vs-micro-batch-for-online-learning/
[3] http://sf.flink-forward.org/kb_sessions/introducing-flink-tensorflow/
[4] http://sf.flink-forward.org/kb_sessions/non-flink-machine-le
arning-on-flink/
[5] http://sf.flink-forward.org/kb_sessions/streaming-deep-learn
ing-scenarios-with-flink/

Cheers,
Gabor



On 2017-02-23 15:19, Katherin Eri wrote:


I have asked already some teams for useful cases, but all of them need
time
to think.
During analysis something will finally arise.
May be we can ask partners of Flink  for cases? Data Artisans got results
of customers survey: [1], ML better support is wanted, so we could ask
what
exactly is necessary.

[1] http://data-artisans.com/flink-user-survey-2016-part-2/

23 февр. 2017 г. 4:32 PM пользователь "Stavros Kontopoulos" <
st.kontopou...@gmail.com> написал:

+100 for a design doc.

Could we also set a roadmap after some time-boxed investigation captured
in
that document? We need action.

Looking forward to work on this (whatever that might be) ;) Also are
there
any data supporting one direction or the other from a customer
perspective?
It would help to make more informed decisions.

On Thu, Feb 23, 2017 at 2:23 PM, Katherin Eri <katherinm...@gmail.com>
wrote:

Yes, ok.

let's start some design document, and write down there already mentioned
ideas about: parameter server, about clipper and others. Would be nice
if
we will also map this approaches to cases.
Will work on it collaboratively on each topic, may be finally we will


form


some picture, that could be agreed with committers.
@Gabor, could you please start such shared doc, as you have already


several


ideas proposed?

чт, 23 февр. 2017, 15:06 Gábor Hermann <m...@gaborhermann.com>:

I agree, that it's better to go in one direction first, but I think

online and offline with streaming API can go somewhat parallel later.


We
could set a short-term goal, concentrate initially on one direction,
and
showcase that direction (e.g. in a blogpost). But first, we should list

the pros/cons in a design doc as a minimum. Then make a decision what
direction to go. Would that be feasible?

On 2017-02-23 12:34, Katherin Eri wrote:

I'm not sure that this is feasible, doing all at the same time could
mean
doing nothing

I'm just afraid, that words: we will work on streaming not on


batching,

we

have no commiter's time for this, mean that yes, we started work on
FLINK-1730, but nobody will commit this work in the end, as it


already

was

with this ticket.

23 февр. 2017 г. 14:26 пользователь "Gábor Hermann" <


m...@gaborhermann.com>


написал:

@Theodore: Great to hear you think the "batch on streaming" approach
is

possible! Of course, we need to pay attention all the pitfalls

there,

if we

go that way.

+1 for a design doc!

I would add that it's possible to make efforts in all the three


directions
(i.e. batch, online, batch on streaming) at the same time. Although,
it

might be worth to concentrate on one. E.g. it would not be so useful

to

have the same batch algorithms with both the batch API and streaming

API.
We can decide later.

The design doc could be partitioned to these 3 directions, and we


can

collect there the pros/cons too. What do you think?

Ch

Re: [DISCUSS] Flink ML roadmap

2017-02-23 Thread Gábor Hermann

Okay, I've created a skeleton of the design doc for choosing a direction:
https://docs.google.com/document/d/1afQbvZBTV15qF3vobVWUjxQc49h3Ud06MIRhahtJ6dw/edit?usp=sharing

Much of the pros/cons have already been discussed here, so I'll try to 
put there all the arguments mentioned in this thread. Feel free to put 
there more :)


@Stavros: I agree we should take action fast. What about collecting our 
thoughts in the doc by around Tuesday next week (28. February)? Then 
decide on the direction and design a roadmap by around Friday (3. 
March)? Is that feasible, or should it take more time?


I think it will be necessary to have a shepherd, or even better a 
committer, to be involved in at least reviewing and accepting the 
roadmap. It would be best, if a committer coordinated all this.

@Theodore: Would you like to do the coordination?

Regarding the use-cases: I've seen some abstracts of talks at SF Flink 
Forward [1] that seem promising. There are companies already using Flink 
for ML [2,3,4,5].


[1] http://sf.flink-forward.org/program/sessions/
[2] 
http://sf.flink-forward.org/kb_sessions/experiences-with-streaming-vs-micro-batch-for-online-learning/

[3] http://sf.flink-forward.org/kb_sessions/introducing-flink-tensorflow/
[4] 
http://sf.flink-forward.org/kb_sessions/non-flink-machine-learning-on-flink/
[5] 
http://sf.flink-forward.org/kb_sessions/streaming-deep-learning-scenarios-with-flink/


Cheers,
Gabor


On 2017-02-23 15:19, Katherin Eri wrote:

I have asked already some teams for useful cases, but all of them need time
to think.
During analysis something will finally arise.
May be we can ask partners of Flink  for cases? Data Artisans got results
of customers survey: [1], ML better support is wanted, so we could ask what
exactly is necessary.

[1] http://data-artisans.com/flink-user-survey-2016-part-2/

23 февр. 2017 г. 4:32 PM пользователь "Stavros Kontopoulos" <
st.kontopou...@gmail.com> написал:


+100 for a design doc.

Could we also set a roadmap after some time-boxed investigation captured in
that document? We need action.

Looking forward to work on this (whatever that might be) ;) Also are there
any data supporting one direction or the other from a customer perspective?
It would help to make more informed decisions.

On Thu, Feb 23, 2017 at 2:23 PM, Katherin Eri <katherinm...@gmail.com>
wrote:


Yes, ok.
let's start some design document, and write down there already mentioned
ideas about: parameter server, about clipper and others. Would be nice if
we will also map this approaches to cases.
Will work on it collaboratively on each topic, may be finally we will

form

some picture, that could be agreed with committers.
@Gabor, could you please start such shared doc, as you have already

several

ideas proposed?

чт, 23 февр. 2017, 15:06 Gábor Hermann <m...@gaborhermann.com>:


I agree, that it's better to go in one direction first, but I think
online and offline with streaming API can go somewhat parallel later.

We

could set a short-term goal, concentrate initially on one direction,

and

showcase that direction (e.g. in a blogpost). But first, we should list
the pros/cons in a design doc as a minimum. Then make a decision what
direction to go. Would that be feasible?

On 2017-02-23 12:34, Katherin Eri wrote:


I'm not sure that this is feasible, doing all at the same time could

mean

doing nothing
I'm just afraid, that words: we will work on streaming not on

batching,

we

have no commiter's time for this, mean that yes, we started work on
FLINK-1730, but nobody will commit this work in the end, as it

already

was

with this ticket.

23 февр. 2017 г. 14:26 пользователь "Gábor Hermann" <

m...@gaborhermann.com>

написал:


@Theodore: Great to hear you think the "batch on streaming" approach

is

possible! Of course, we need to pay attention all the pitfalls

there,

if we

go that way.

+1 for a design doc!

I would add that it's possible to make efforts in all the three

directions

(i.e. batch, online, batch on streaming) at the same time. Although,

it

might be worth to concentrate on one. E.g. it would not be so useful

to

have the same batch algorithms with both the batch API and streaming

API.

We can decide later.

The design doc could be partitioned to these 3 directions, and we

can

collect there the pros/cons too. What do you think?

Cheers,
Gabor


On 2017-02-23 12:13, Theodore Vasiloudis wrote:


Hello all,


@Gabor, we have discussed the idea of using the streaming API to

write

all

of our ML algorithms with a couple of people offline,
and I think it might be possible and is generally worth a shot. The
approach we would take would be close to Vowpal Wabbit, not exactly
"online", but rather "fast-batch".

There will be problems popping up again, even for very simple algos

like

on
line linear regression with SGD [1], but hopefully fixing those

will

be

more aligned with the prioritie

Re: [DISCUSS] Flink ML roadmap

2017-02-23 Thread Gábor Hermann
I agree, that it's better to go in one direction first, but I think 
online and offline with streaming API can go somewhat parallel later. We 
could set a short-term goal, concentrate initially on one direction, and 
showcase that direction (e.g. in a blogpost). But first, we should list 
the pros/cons in a design doc as a minimum. Then make a decision what 
direction to go. Would that be feasible?


On 2017-02-23 12:34, Katherin Eri wrote:


I'm not sure that this is feasible, doing all at the same time could mean
doing nothing
I'm just afraid, that words: we will work on streaming not on batching, we
have no commiter's time for this, mean that yes, we started work on
FLINK-1730, but nobody will commit this work in the end, as it already was
with this ticket.

23 февр. 2017 г. 14:26 пользователь "Gábor Hermann" <m...@gaborhermann.com>
написал:


@Theodore: Great to hear you think the "batch on streaming" approach is
possible! Of course, we need to pay attention all the pitfalls there, if we
go that way.

+1 for a design doc!

I would add that it's possible to make efforts in all the three directions
(i.e. batch, online, batch on streaming) at the same time. Although, it
might be worth to concentrate on one. E.g. it would not be so useful to
have the same batch algorithms with both the batch API and streaming API.
We can decide later.

The design doc could be partitioned to these 3 directions, and we can
collect there the pros/cons too. What do you think?

Cheers,
Gabor


On 2017-02-23 12:13, Theodore Vasiloudis wrote:


Hello all,


@Gabor, we have discussed the idea of using the streaming API to write all
of our ML algorithms with a couple of people offline,
and I think it might be possible and is generally worth a shot. The
approach we would take would be close to Vowpal Wabbit, not exactly
"online", but rather "fast-batch".

There will be problems popping up again, even for very simple algos like
on
line linear regression with SGD [1], but hopefully fixing those will be
more aligned with the priorities of the community.

@Katherin, my understanding is that given the limited resources, there is
no development effort focused on batch processing right now.

So to summarize, it seems like there are people willing to work on ML on
Flink, but nobody is sure how to do it.
There are many directions we could take (batch, online, batch on
streaming), each with its own merits and downsides.

If you want we can start a design doc and move the conversation there,
come
up with a roadmap and start implementing.

Regards,
Theodore

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.
nabble.com/Understanding-connected-streams-use-without-times
tamps-td10241.html

On Tue, Feb 21, 2017 at 11:17 PM, Gábor Hermann <m...@gaborhermann.com>
wrote:

It's great to see so much activity in this discussion :)

I'll try to add my thoughts.

I think building a developer community (Till's 2. point) can be slightly
separated from what features we should aim for (1. point) and showcasing
(3. point). Thanks Till for bringing up the ideas for restructuring, I'm
sure we'll find a way to make the development process more dynamic. I'll
try to address the rest here.

It's hard to choose directions between streaming and batch ML. As Theo
has
indicated, not much online ML is used in production, but Flink
concentrates
on streaming, so online ML would be a better fit for Flink. However, as
most of you argued, there's definite need for batch ML. But batch ML
seems
hard to achieve because there are blocking issues with persisting,
iteration paths etc. So it's no good either way.

I propose a seemingly crazy solution: what if we developed batch
algorithms also with the streaming API? The batch API would clearly seem
more suitable for ML algorithms, but there a lot of benefits of this
approach too, so it's clearly worth considering. Flink also has the high
level vision of "streaming for everything" that would clearly fit this
case. What do you all think about this? Do you think this solution would
be
feasible? I would be happy to make a more elaborate proposal, but I push
my
main ideas here:

1) Simplifying by using one system
It could simplify the work of both the users and the developers. One
could
execute training once, or could execute it periodically e.g. by using
windows. Low-latency serving and training could be done in the same
system.
We could implement incremental algorithms, without any side inputs for
combining online learning (or predictions) with batch learning. Of
course,
all the logic describing these must be somehow implemented (e.g.
synchronizing predictions with training), but it should be easier to do
so
in one system, than by combining e.g. the batch and streaming API.

2) Batch ML with the streaming API is not harder
Despite these benefits, it could seem harder to implement batch ML with
the streaming API, but in my opinion it's not. There are more fl

Re: [DISCUSS] Per-key event time

2017-02-23 Thread Gábor Hermann

Hey all,

Let me share some ideas about this.

@Paris: The local-only progress tracking indeed seems easier, we do not 
need to broadcast anything. Implementation-wise it is easier, but 
performance-wise probably not. If one key can come from multiple 
sources, there could be a lot more network overhead with per-key 
tracking then broadcasting, somewhat paradoxically. Say source instance 
S1 sends messages and watermarks to operator instances O1, O2. In the 
broadcasting case, S1 would send one message to O1 and one to O2 per 
watermark (of course it depends on how fast the watermarks arrive), 
total of 2. Although, if we keep track of per-key watermarks, S1 would 
need to send watermarks for every key directed to O1, also for O2. So if 
10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks 
arrive at the same rate per-key as per-source in the previous case) we 
S1 would send a total of 20 watermarks.


Another question is whether how large the state-per-key is? If it's 
really small (an integer maybe, or state of a small state machine), then 
the overhead of keeping track of a (Long) watermark is large 
memory-wise. E.g. Int state vs. Long watermark results in 3x as large 
state. Also, the checkpointing would be ~3x as slow. Of course, for 
large states a Long watermark would not mean much overhead.


We could resolve the memory issue by using some kind of sketch data 
structure. Right now the granularity of watermark handling is 
per-operator-instance. On the other hand, per-key granularity might be 
costly. What if we increased the granularity of watermarks inside an 
operator by keeping more than one watermark tracker in one operator? 
This could be quite simply done with a hash table. With a hash table of 
size 1, we would yield the current semantics (per-operator-instance 
granularity). With a hash table large enough to have at most one key per 
bucket, we would yield per-key watermark tracking. In between lies the 
trade-off between handling time-skew and a lot of memory overhead. This 
does not seem hard to implement.


Of course, at some point we would still need to take care of watermarks 
per-key. Imagine that keys A and B would go to the same bucket of the 
hash table, and watermarks are coming in like this: (B,20), (A,10), 
(A,15), (A,40). Then the watermark of the bucket should be the minimum 
as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of 
the watermarks of A and B separately. But after we have a correct 
watermark for the bucket, all we need to care about is the bucket 
watermarks. So somewhere (most probably at the source) we would have to 
pay memory overhead of tracking every key, but nowhere else in the topology.


Regarding the potentially large network overhead, the same compression 
could be useful. I.e. we would not send watermarks from one operator 
per-key, but rather per-hash. Again, the trade-off between time skew and 
memory consumption is configurable by the size of the hash table used.


Cheers,
Gabor

On 2017-02-23 08:57, Paris Carbone wrote:


Hey Jamie!

Key-based progress tracking sounds like local-only progress tracking to me, 
there is no need to use a low watermarking mechanism at all since all streams 
of a key are handled by a single partition at a time (per operator).
Thus, this could be much easier to implement and support (i.e., no need to 
broadcast the progress state of each partition all the time).
State-wise it should be fine too if it is backed by rocksdb, especially if we 
have MapState in the future.

Just my quick thoughts on this, to get the discussion going :)

cheers
Paris


On 23 Feb 2017, at 01:01, Jamie Grier  wrote:

Hi Flink Devs,

Use cases that I see quite frequently in the real world would benefit from
a different watermarking / event time model than the one currently
implemented in Flink.

I would call Flink's current approach partition-based watermarking or maybe
subtask-based watermarking.  In this model the current "event time" is a
property local to each subtask instance in a dataflow graph.  The event
time at any subtask is the minimum of the watermarks it has received on
each of it's input streams.

There are a couple of issues with this model that are not optimal for some
(maybe many) use cases.

1) A single slow subtask (or say source partition) anywhere in the dataflow
can mean no progress can be made on the computation at all.

2) In many real world scenarios the time skew across keys can be *many*
times greater than the time skew within the data with the same key.

In this discussion I'll use "time skew" to refer to the out-of-orderness
with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)

Anyway, let me provide an example or two.

In IoT applications the source of events is a particular device out in the
world, let's say a device in a connected car application.  The data for
some particular device may be very bursty and we will certainly get 

Re: [DISCUSS] Project build time and possible restructuring

2017-02-22 Thread Gábor Hermann

@Stephan:

Although I tried to raise some issues about splitting committers, I'm 
still strongly in favor of some kind of restructuring. We just have to 
be conscious about the disadvantages.


Not splitting the committers could leave the libraries in the same 
stalling status, described by Till. Of course, dedicating current 
committers as shepherds of the libraries could easily resolve the issue. 
But that requires time from current committers. It seems like trade-offs 
between code quality, speed of development, and committer efforts.


From what I see in the discussion about ML, there are many people 
willing to contribute as well as production use-cases. This means we 
could and should move forward. However, the development speed is 
significantly slowed down by stalling PRs. The proposal for contributors 
helping the review process did not really work out so far. In my 
opinion, either code quality (by more easily accepting new committers) 
or some committer time (reviewing/merging) should be sacrificed to move 
forward. As Till has indicated, it would be shameful if we let this 
contribution effort die.


Cheers,
Gabor



Re: [DISCUSS] Project build time and possible restructuring

2017-02-22 Thread Gábor Hermann

Hi all,

I'm also in favor of splitting, but only in terms of committers. I agree 
with Theodore, that async releases would cause confusion. With time 
based releases [1] it should be easy to sync release.


Even if it's possible to add committers to different components, should 
we do a more fine-grained split? E.g. should we split the committers of 
Gelly and ML? If not, committers should be trusted not to fiddle with 
something that's not their territory. That might not be a problem, as it 
seems to be the case right now.


What should we do if ASF does not allow splitting? Should we add more 
committers and trust them not to touch code that's not their 
responsibility? That's the same as no split in terms of committers 
(build time can be lowered of course).


What about ensuring code quality? In my opinion the primary reason of 
being a committer is to ensure code quality. Committers are trusted to 
adhere to a certain code quality, partly determined by developer 
guidelines, and make others adhere too.


By adding more committers with less consideration, we are risking the 
quality of different components. That might not be a problem, because 
that's the price of a more dynamic development in libraries etc., but we 
should ensure that *eventually* the code quality converges to what's 
expected by Flink. So a new committer would learn some of the 
responsibilities as a committer, not as a contributor. But what if the 
new committer fails to adhere? Is there a way to revoke committer status?


[1] https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases

Cheers,
Gabor



Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Gábor Hermann

Hi Stavros,

Thanks for bringing this up.

There have been past [1] and recent [2, 3] discussions about the Flink 
libraries, because there are some stalling PRs and overloaded 
committers. (Actually, Till is the only committer shepherd of the both 
the CEP and ML library, and AFAIK he has a ton of other responsibilities 
and work to do.) Thus it's hard to get code reviewed and merged, and 
without merged code it's hard to get a committer status, so there are 
not many committers who can review e.g. ML algorithm implementations, 
and the cycle goes on. Until this is resolved somehow, we should help 
the committers by reviewing each-others PRs.


I think prioritizing features (b) is a good way to start. We could 
declare most blocking features and concentrate on reviewing and merging 
them before moving forward. E.g. the evaluation framework is quite 
important for an ML library in my opinion, and has a PR stalling for 
long [4].


Regarding c),  there are styleguides generally for contributing to 
Flink, so we should follow that. Is there something more ML specific you 
think we could follow? We should definitely declare, we follow 
scikit-learn and make sure contributions comply to that.


In terms of features (a, d), I think we should first see the bigger 
picture. That is, it would be nice to discuss a clearer direction for 
Flink ML. I've seen a lot of interest in contributing to Flink ML 
lately. I believe we should rethink our goals, to put the contribution 
efforts in making a usable and useful library. Are we trying to 
implement as many useful algorithms as possible to create a scalable ML 
library? That would seem ambitious, and of course there are a lot of 
frameworks and libraries that already has something like this as goal 
(e.g. Spark MLlib, Mahout). Should we rather create connectors to 
existing libraries? Then we cannot really do Flink specific 
optimizations. Should we go for online machine learning (as Flink is 
concentrating on streaming)? We already have a connector to SAMOA. We 
could go on with questions like this. Maybe I'm missing something, but I 
haven't seen such directions declared.


Cheers,
Gabor

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Opening-a-discussion-on-FlinkML-td10265.html
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-CEP-development-is-stalling-td15237.html#a15341
[3] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/New-Flink-team-member-Kate-Eri-td15349.html

[4] https://github.com/apache/flink/pull/1849

On 2017-02-20 11:43, Stavros Kontopoulos wrote:


(Resending with the appropriate topic)

Hi,

I would like to start a discussion about next steps for Flink ML.
Currently there is a lot of work going on but needs a push forward.

Some topics to discuss:

a) How several features should be planned and get aligned with Flink
releases.
b) Priorities of what should be done.
c) Basic guidelines for code: styleguides, scikit-learn compliance etc
d) Missing features important for the success of the library, next steps
etc...

Thoughts?

Best,
Stavros





Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann

Hey Ufuk,

I'm happy to contribute. At least I'll get a bit more understanding of 
the details.


Breaking the assumption that only a single thread updates state would 
brings us from strong isolation guarantees (i.e. serializability at the 
updates and read committed at the external queries) to no isolation 
guarantees. That's not something to be taken lightly. I think that these 
guarantees would be more easily provided for inside queries that modify 
(setKvState), but that's still not trivial.


Indeed, the iteration approach works better for the use-cases I 
mentioned, at least for now.


Cheers,
Gabor

On 2017-02-14 14:43, Ufuk Celebi wrote:


Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a 
proposal to improve the queryable state APIs for lookups (partly along the 
lines of what you suggested with higher level accessors). Maybe you are 
interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about the write 
path (setKvState), because of the discussed implications to the state backends. I think 
that this needs more discussion and coordination with the contributors working on the 
backends. For example, one assumption so far was that only a single thread updates state 
and we don't scope state per checkpoint (to provide "isolation levels" for the 
queries; read comitted vs. read uncommitted) and probably more.

Because of this I would actually lean towards the iteration approach in a first 
version. Would that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote:

Hi Gyula, Jinkui Shi,
  
Thanks for your thoughts!
  
@Gyula: I'll try and explain a bit more detail.
  
The API could be almost like the QueryableState's. It could be

higher-level though: returning Java objects instead of serialized data
(because there would not be issues with class loading). Also, it could
support setKvState (see my 5. point). This could lead to both a
potential performance improvements and easier usage (see my points 2.
and 3.).
  
A use-case could be anything where we'd use an external KV-store.

For instance we are updating user states based on another user state, so
in the map function we do a query (in pseudo-ish Scala code):
  
users.keyBy(0).flatMapWithState { (userEvent, collector) =>

val thisUser: UserState = state.get()
val otherUser: Future[UserState] =
qsClient.getKvState("users", userEvent.otherUserId)
  
otherUser.onSuccess { case otherUserState =>

state.update(someFunc1(thisUser, otherUserState))
collector.collect(someFunc2(thisUser, otherUserState))
}
}
  
Another example could be (online) distributed matrix factorization,

where the two factor matrices are represented by distributed states. One
is updated by querying the other (with getKvState), and computing some
function (i.e. SGD), while the other is updated at the same place (with
setKvState).
  
I see the motivation behind the QueryableState as a way to make further

use of the KV-store we practically have at stateful operators (but
please correct me if I'm wrong). I think we could make even more use of
if the KV-store is used inside the same job.
  
1) Order and timeliness

As you've mentioned it's hard to guarantee any ordering when working
with two states on possibly distinct machines. This could bring us to
distributed transaction processing, what's a complex topic in itself. I
can imagine using watermarks and keeping different state versions to
only allow querying state from the past, and not from the future. For
now, let's just assume that order does not matter.
  
2) Fault-tolerance

There might be other things we could do, but there are two simple
guarantees that we can surely provide. First, by using the current
QueryableState the task could fail with incomplete futures. If the
records producing those futures are received before the previous
checkpoint barrier, those updates will be completely lost. We could
solve this by wait for the futures to complete before starting a
checkpoint, thus providing exactly-once guarantees. This would ensure
that, although the UDF has side-effects, every record has its effect
exactly-once. I don't see a good way to provide this guarantee with the
current QueryableState. Second, we can guarantee that the query will
eventually succeed (or else the whole topology would fail).
  
3) Optimizations

I've also got two examples for optimizations. First, we can do a
best-effort to co-locate the two stateful operators to save on network
overhead. The user could try to co-locate the querying machines when
externally querying the state, but could not move the machines with the
state being queried. Second, we could provide a user-interface for
(loose) guarantees on the latency of sending and returning queries, just
like setting the buffer timeout.
  
4) Concurrent reading/writing

Key-value states and collectors might be ac

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann
GPU calculations in 
Flink [1]. Fortunately, that seems orthogonal to our problem: if there's 
a PS, we can later include GPU calculations.


My main question remains: whether there's a need for an integrated PS or 
not. It would not fit into the current project structure (neither in ML 
nor in Streaming), so I guess the only direction is to build on top of 
the streaming API and use an external service just as you've proposed.


[1] https://issues.apache.org/jira/browse/FLINK-5782

Cheers,
Gabor


On 2017-02-13 04:10, Jinkui Shi wrote:

hi,Gábor Hermann

The online parameter server is a good proposal.
PS’ paper [1] have a early implement [2], and now it’s mxnet [3].
I have some thought about online PS in Flink:
1.  Whether support flexible and configurable update strategy?
For example, in one iteration, computing serval times updating once or 
update every time of iteration.
2.  Whether we implement is fully based on the DAG, having not too much 
modify the runtime and core?
-   The parameter server is the Source with distributed parameter 
data, called PS.
-   The worker nodes are the DAG except the Source. That is some ML 
arithmetic implemented using Flink API.
-   Multiple layer computing’s result flow to the Sink operator 
naturally
-   Sink can feedback to the Source for next iteration
-   Atomic tuning the busy operator, increase/decrease the compute 
resource(the max parallelism) of the runtime operators.
3.  Atomically detect GPU supporting provided by Mesos, and use it if 
enable configuration of using GPU.

[1] https://www.cs.cmu.edu/~muli/file/ps.pdf 
<https://www.cs.cmu.edu/~muli/file/ps.pdf>
[2] https://github.com/dmlc/parameter_server 
<https://github.com/dmlc/parameter_server>
[3] https://github.com/dmlc/mxnet <https://github.com/dmlc/mxnet>


On Feb 12, 2017, at 00:54, Gyula Fóra <gyf...@apache.org> wrote:

Hi Gábor,

I think the general idea is very nice, but it would nice to see clearer
what benefit does this bring from the developers perspective. Maybe rough
API sketch and 1-2 examples.

I am wondering what sort of consistency guarantees do you imagine for such
operations, or why the fault tolerance is even relevant. Are you thinking
about an asynchronous API such as querying the state for another key might
give you a Future that is guaranteed to complete eventually.

It seems to be hard to guarantee the timeliness (order) of these operations
with respect to the updates made to the states, so I wonder if there is
benefit of doing this compared to using the Queryable state interface. Is
this only a potential performance improvement or is it easier to work with
this?

Cheers,
Gyula

Gábor Hermann <m...@gaborhermann.com> ezt írta (időpont: 2017. febr. 10.,
P, 16:01):


Hi all,

TL;DR: Is it worth to implement a special QueryableState for querying
state from another part of a Flink streaming job and aligning it with
fault tolerance?

I've been thinking about implementing a Parameter Server with/within
Flink. A Parameter Server is basically a specialized key-value store
optimized for training distributed machine learning models. So not only
the training data, but also the model is distributed. Range queries are
typical, and usually vectors and matrices are stored as values.

More generally, an integrated key-value store might also be useful in
the Streaming API. Although external key-value stores can be used inside
UDFs for the same purpose, aligning them with the fault tolerance
mechanism of Flink could be hard. What if state distributed by a key (in
the current Streaming API) could be queried from another operator? Much
like QueryableState, but querying *inside* the Flink job. We could make
use of the fact that state has been queried from inside to optimize
communication and integrate fault tolerance.

The question is whether the Flink community would like such feature, and
if so how to do it?

I could elaborate my ideas if needed, and I'm happy to create a design
doc, but before that, I'd like to know what you all think about this.
Also, I don't know if I'm missing something, so please correct me. Here
are some quick notes regarding the integrated KV-store:

Pros
- It could allow easier implementation of more complicated use-cases.
E.g. updating users preferences simultaneously based on each others
preferences when events happen between them such as making a connection,
liking each other posts, or going to the same concert. User preferences
are distributed as a state, an event about user A liking user B gets
sent to A's state and queries the state of B, then updates the state of
B. There have been questions on the user mailing list for similar
problems [1].
- Integration with fault tolerance. User does not have to maintain two
systems consistently.
- Optimization potentials. At the social network example maybe other
users on the same partitions with user A need the state of user B

Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-10 Thread Gábor Hermann

Hi all,

TL;DR: Is it worth to implement a special QueryableState for querying 
state from another part of a Flink streaming job and aligning it with 
fault tolerance?


I've been thinking about implementing a Parameter Server with/within 
Flink. A Parameter Server is basically a specialized key-value store 
optimized for training distributed machine learning models. So not only 
the training data, but also the model is distributed. Range queries are 
typical, and usually vectors and matrices are stored as values.


More generally, an integrated key-value store might also be useful in 
the Streaming API. Although external key-value stores can be used inside 
UDFs for the same purpose, aligning them with the fault tolerance 
mechanism of Flink could be hard. What if state distributed by a key (in 
the current Streaming API) could be queried from another operator? Much 
like QueryableState, but querying *inside* the Flink job. We could make 
use of the fact that state has been queried from inside to optimize 
communication and integrate fault tolerance.


The question is whether the Flink community would like such feature, and 
if so how to do it?


I could elaborate my ideas if needed, and I'm happy to create a design 
doc, but before that, I'd like to know what you all think about this. 
Also, I don't know if I'm missing something, so please correct me. Here 
are some quick notes regarding the integrated KV-store:


Pros
- It could allow easier implementation of more complicated use-cases.
E.g. updating users preferences simultaneously based on each others 
preferences when events happen between them such as making a connection, 
liking each other posts, or going to the same concert. User preferences 
are distributed as a state, an event about user A liking user B gets 
sent to A's state and queries the state of B, then updates the state of 
B. There have been questions on the user mailing list for similar 
problems [1].
- Integration with fault tolerance. User does not have to maintain two 
systems consistently.
- Optimization potentials. At the social network example maybe other 
users on the same partitions with user A need the state of user B, so we 
don't have to send around user B twice.
- Could specialize to a Parameter Server for simple (and efficient) 
implementation of (possibly online) machine learning. E.g. sparse 
logistic regression, LDA, matrix factorization for recommendation systems.


Cons
- Lot of development effort.
- "Client-server" architecture goes against the DAG dataflow model.

Two approaches for the implementation in the streaming API:

1) An easy way to implement this is to use iterations (or the proposed 
loops API). We can achieve two-way communication by two operators in a 
loop: a worker (W) and a Parameter Server (PS), see the diagram [2]. (An 
additional nested loop in the PS could add replication opportunities). 
Then we would get fault tolerance "for free" by the work of Paris [3]. 
It would also be on top of the Streaming API, with no effect on the runtime.


2) A problem with the loop approach is that coordination between PS 
nodes and worker nodes can only be done on the data stream. We could not 
really use e.g. Akka for async coordination. A harder but more flexible 
way is to use lower-level interfaces of Flink and touch the runtime. 
Then we would have to take care of fault tolerance too.


(As a side note: in the batch API generalizing delta iterations could be 
a solution for Parameter Server [4].)


Thanks for any feedback :)

Cheers,
Gabor

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sharded-state-2-step-operation-td8631.html

[2] https://i.imgur.com/GsliUIh.png
[3] https://github.com/apache/flink/pull/1668
[4] 
https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran




Re: Flink ML recommender system API

2016-11-10 Thread Gábor Hermann

Hello Theodore,

Thanks for your reply.

Of course. I would have done that in the first place but I had seen the 
contribution guideline advising to avoid WIP PRs:


"No WIP pull requests. We consider pull requests as requests to merge 
the referenced code as is into the current stable master branch. 
Therefore, a pull request should not be “work in progress”. Open a pull 
request if you are confident that it can be merged into the current 
master branch without problems. If you rather want comments on your 
code, post a link to your working branch."


I don't know the rationale behind this, as a WIP PR seems most of the 
time a convenient way to share half-finished code. Maybe I'll open a 
discussion about this.


I'll open a WIP PR for this after cleaning our code a bit.

Cheers,
Gabor

On 2016-11-10 16:56, Theodore Vasiloudis wrote:


Hello Gabor,

for this type of issue (design decisions) what we've done in the past with
FlinkML is to open a PR marked with the WIP tag and take the discussion
there, making it easier
for people to check out the code and get a feel of advantages/disadvantages
of different approaches.

Could you do that for this issue?

Regards,
Theodore

On Thu, Nov 10, 2016 at 12:46 PM, Gábor Hermann <m...@gaborhermann.com>
wrote:


Hi all,

We have managed to fit the ranking recommendation evaluation into the
evaluation framework proposed by Thedore (FLINK-2157). There's one main
problem, that remains: we have to different predictor traits (Predictor,
RankingPredictor) without a common superclass, and that might be
problematic.

Please see the details at the issue:
https://issues.apache.org/jira/browse/FLINK-4713

Could you give feedback on whether we are moving in the right direction or
not? Thanks!

Cheers,
Gabor






Re: Flink ML recommender system API

2016-11-10 Thread Gábor Hermann

Hi all,

We have managed to fit the ranking recommendation evaluation into the 
evaluation framework proposed by Thedore (FLINK-2157). There's one main 
problem, that remains: we have to different predictor traits (Predictor, 
RankingPredictor) without a common superclass, and that might be 
problematic.


Please see the details at the issue:
https://issues.apache.org/jira/browse/FLINK-4713

Could you give feedback on whether we are moving in the right direction 
or not? Thanks!


Cheers,
Gabor



Re: SVMITSuite Testing

2016-10-26 Thread Gábor Hermann

Hi Jesse,

Have you tried running the test in an IDE (e.g. Intellij IDEA)? AFAIK 
they have support for ScalaTest.


When running a Maven build, it seems to skip integration tests (ones 
marked with "IT") intentionally. I assume it would take much time to run 
those tests. You can run them by explicitly turning on integration tests:

mvn integration-test

I haven't find any documentation for this either, but you can inspect 
the Maven surefire plugin configuration in the pom.xml file [1], that 
sets what tests to run or exclude.


Cheers,
Gabor

[1] https://github.com/apache/flink/blob/master/pom.xml#L978

On 2016-10-25 20:10, Jesse Bannon wrote:


Hello,

I am trying to run the SVMITSuite in the Flink-ML library. When I build the
package it seems to skip all tests in SVMITSuite - I'm assuming it's
because there's no ExecutionEnvironment set up to use the DataSet API.

I can't seem to find any documentation on how to run this either. Any help
would be appreciated.

Thanks in advanced,
~ Jesse





Re: Flink ML recommender system API

2016-10-04 Thread Gábor Hermann

Thank you both for your detailed replies.

I think we all agree on extending the evaluation framework to handle 
recommendation models, and choosing the scalable form of ranking, so 
we'll do it that way. For now we will work upon Theodore's PR.


Thanks for giving me the reasons behind the design decision about not 
having a separate object for the trained model. I haven't thought about 
the implications of pipelines, so I think we should keep the current 
design and align our new algorithms to it. Of course, we can bring up a 
discussion later and reconsider this design, but I see that it's a 
separate issue.



I think it would be good to implement a ScoreMatrixFactorizationRecommender
and a RankingMatrixFactorizationRecommender which both work on a
MatrixFactorizationModel. This model can then either be computed by ALS or
DSGD. This could be controlled by a configuration parameter of the
recommenders.
Do you mean having two different predictors, i.e. 
Predictor[ScoreMatrixFactorizationRecommender] and 
Predictor[RankingMatrixFactorizationRecommender]?
If I understand right, there should be one common class 
MatrixFactorizationModel instead of distinct ALS and DSGD classes, and 
it should be a configuration parameter which one to use for training?


I like this idea, as both trainers would require almost the same 
configuration. AFAIK there would be an additional 'LearningRate' 
parameter for DSGD, but apart from that the two configs are the same.



What do you mean with more "typesafe"? I don't see how returning the
trained model from the fit method gives you more type safety.
I probably used the wrong word here. I simply meant that using a 
separate type for the trained model, the type ensures that the trained 
model cannot be trained again, while an untrained model cannot be used 
for prediction.


Regarding the DSGD algorithm, I think it uses another sampling 
mechanism, and we cannot reuse the simple SGD solver. However, we will 
make sure not to write duplicate code for the same problem. We've also 
noticed, independently from DSGD, that the SGD solver is a GD solver in 
reality, but I have not found the related issues and discussion, so 
pointing me to them was really useful, thanks!


Cheers,
Gabor


ML contributions

2016-09-12 Thread Gábor Hermann

Hey all,

We are planning to contribute some algorithms and improvements to Flink 
ML at SZTAKI .
I have already opened a JIRA 
 for an implicit 
feedback ALS, but probably more will come soon.


We are implementing algorithms anyway, and it would be nice if we could 
incorporate them into Flink,

so I just wanted to let you all know about our efforts.

Cheers,
Gabor