[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-10-07 Thread sachingoel0101
Github user sachingoel0101 closed the pull request at:

https://github.com/apache/flink/pull/1003


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-10-07 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-146178033
  
Sure. However, any further work will depend on finalizing the interface in 
#967 which has been idle for quite some time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-10-06 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-145854816
  
@sachingoel0101 Sounds good! Can you open a new PR once you are done with 
the Parameter Server interface and close this PR? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-09-22 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-142190932
  
Yes. I agree. I'm currently working on finalizing the interface. Just 
waiting for another PR to get in.

And involving the Job Manager is not strictly necessary. I can create 
another standalone actor to manage the servers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-09-12 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-139742195
  
Having interfaces for a Parameter Server service in Flink is a very good 
idea, IMO. This interface can be implemented for different backends, such as 
Ignite or an own lightweight implementation.

However, I doubt that it really necessary to bake the Parameter Server 
master into the JobManager. Can't this be a completely stand-alone service to 
which Flink programs write to and read from via the provided interfaces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-09-07 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-138419004
  
@StephanEwen , I had a look at #967 and I quite agree with the interface 
designed by @nltran. I believe I can separate out my parameter server 
implementation from the core runtime itself. 
The only component involved from core will be the Job Manager, which will 
act as master for all Parameter Servers started.
And as you suggested, we can give the user a choice to choose either Ignite 
based implementation or the native one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-131477326
  
A stand-alone parameter server service will require setting up and tearing 
down of the client every time the user, say, opens and closes a Rich function 
while using it. Further, it means we have to add another dependency when the 
same could be accomplished using akka.
In this implementation, the parameter *server* is at every task manager 
[effectively it acts as a client to serve all running tasks at one node. In 
fact, in this sense, there are no servers, just clients at every worker, which 
are managed by the Job Manager]. This in itself means lesser data transfer over 
the network, since every *server* will usually be owner of a key and can serve 
its *clients* faster instead of every request going over the network.
Further, it is completely distributed, and every task manager maintains its 
own *server* and sets it up or tears it down along-with itself.
As far as including it in the core itself is concerned, there isn't much of 
it. There are the 3-4 odd functions directly added in the Runtime context, 
which effectively serve as an interface.
@tillrohrmann, could you weigh in here if this is the intended use of a PS 
in ML algorithms. I can easily see this working with, for example, the 
regression algorithm.
The reasons I included it into the runtime is that, there will be no 
chances of failure now. If the TaskManager is alive, the Parameter Server at 
that client will be alive. Further, the Job Manager manages the servers and 
determines where each key will go [which will be crucial to recovery], 
something which can be very hard to determine in a completely de-centralized 
manner (I couldn't think of a full-proof way). This ensures that the server is 
running only on the workers where it's needed, and if it is needed. Keeping the 
Job Manager in the loop also ensures that recovery is easy. If a Task Manager 
fails, the Job Manager knows which server failed by matching the `InstanceID`s 
and can kick off the recovery process from the duplicate server. [This is not 
implemented yet.]
A stand-alone PS will add another master-node system in parallel to the 
JobManager-TaskManager system, which can be efficiently used for this purpose. 
Of course, this doesn't matter if we use an external key-value store.
I will have a look at #967 and see how the two can be integrated.

I had a look at an open implementation done for Spark. 
https://github.com/apache/spark/compare/branch-1.3...chouqin:ps-on-spark-1.3
This adds a separate context and a function on RDD to access the PS and 
does require running a service inside the core environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-131403660
  
This is again baked into the Flink runtime. Is there a way to keep this 
separate?

I am still a bit puzzled if this is the pattern with which people use 
parameter servers: embedded in the workers that operate on the training data. 
My impression so far was that the parameter servers are a separate component, 
usually running on different machines.

Also, could you comment on how this could work with the changes proposes in 
#967


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-08-10 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1003

Parameter Server: Distributed Key-Value store, using Akka messaging

This PR adds a basic parameter server implementation built using Akka 
messaging.
*Interface:*
Access: Inside `RuntimeContext`
Operations:
1. `registerBatch(Key, Value)` and `registerAsync(Key, Value)` where key 
and value signify the Key-Value pair. 
2. `updateParameter(key, value)`
3. `fetchParameter(key)`

`Batch` strategy implies that all updates will be done after all registered 
clients have sent their updates.
`Async` strategy implies any updates will be immediately applied.
[I intend to support the Stale synchronous strategy too. But I want to get 
feedback on the overall design of the work so far.]

Messaging and Actor systems:
1. A server is started within the actor system of every task manager.
2. All servers register with the Job Manager.
3. Job Manager determines on which server a particular key-value pair will 
be stored and lets every registered server know as part of a regular heartbeat 
message.
4. `RuntimeContext` employs a `Patterns.ask` and `Await` with infinite 
timeout to query the server on the same task manager.
5. Servers forward their client's requests to the appropriate servers based 
on the key value.
6. After performing the desired operations, results are sent back to the 
originating server which forwards them back to the client.

* There is a scope for an implementation of fault-tolerance in this by 
maintaining two servers for every key-value pair. 
* In case a task has to restart, they can start from a different iteration 
number(by accessing a clock value), not having to send all earlier updates, 
because all the updates are applied to the Value guaranteedly.
* There are two examples demonstrating the usage of Batch and Async 
strategies in Java examples package.
* The travis build doesn't pass yet since there are some new messages from 
`ParameterServer` which need to be handled in the unit tests.

I would love feedback about the overall design. 
This is mostly inspired from a Machine learning Parameter server 
perspective, and doesn't touch the actual internals of Iterative tasks. 
Instead, all operations on the Parameter Server are blocking, and have an 
infinite timeout. However, there is an intrinsic failure involved with a 
maximum number of re-tries, in case the operations fail to succeed, inside the 
Server itself.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink_server

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1003.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1003


commit 539319a30e8d2d841bf1deb1b78de4c4f404dfb1
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-05T04:04:38Z

Reorganized the server framework to runtime itself. Added asynchronous
blocking calls to the server.

commit 06daf1630afba005401826a68a646859383f4f9c
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-09T00:29:30Z

Integrating into the Flink Runtime

commit 22933416e57cd63331a93134f56c44bea0395b24
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-10T01:25:54Z

Added a server for each task manager

commit f3f524b9ecfbeba876c8a5946ac3b491df1b0b6b
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-10T02:32:32Z

Added JobManager as a manager of servers

commit 988309ac4bbfb928b32b08fcecfab2e03d3a3feb
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-10T04:15:08Z

Added a parameter store

commit 51c31078482e7c77de8beb90030094ea81a95384
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-10T10:32:10Z

Added examples to demonstrate the usage




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---