Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Theodore Vasiloudis
Hello all,

I would also be really interested in how a PS-like architecture would work
in Flink. Note that we not necessarily talking about PS, but generally how
QueryableState can be used for ML tasks with I guess a focus on
model-parallel training.

One suggestion I would make is to take a look at Tensorflow which is also a
dataflow model that has support for distributed computation, both data and
model parallel.

I don't know too much about the internal workings of the system, but I
would point out this from the TF whitepaper [1], Section 11 related work:

It also permits a significant simplification by
> allowing  the  expression  of  stateful  parameter  nodes  as
> variables,  and  variable  update  operations  that  are  just
> additional  nodes  in  the  graph;  in  contrast,  DistBelief,
> Project Adam and the Parameter Server systems all have
> whole separate parameter server subsystems devoted to
> communicating and updating parameter values.
>

I think the Related work section is the most relevant to this discussion as
it discusses the differences between the programming models in Spark, Naiad
etc. to the TF model.

Also re. fault tolerance:

When a failure is detected, the entire graph execution
> is  aborted  and  restarted  from  scratch.   Recall  however
> that Variable nodes refer to tensors that persist across ex-
> ecutions of the graph. We support consistent checkpoint-
> ing and recovery of this state on a restart.  In particular,
> each Variable node is connected to a Save node.  These
> Save nodes are executed periodically, say once every N
> iterations, or once every N seconds. When they execute,
> the contents of the variables are written to persistent stor-
> age, e.g., a distributed file system.  Similarly each Vari-
> able is connected to a Restore node that is only enabled
> in the first iteration after a restart.
>

[1] http://download.tensorflow.org/paper/whitepaper2015.pdf

On Tue, Feb 14, 2017 at 3:18 PM, Gábor Hermann 
wrote:

> Hey Ufuk,
>
> I'm happy to contribute. At least I'll get a bit more understanding of the
> details.
>
> Breaking the assumption that only a single thread updates state would
> brings us from strong isolation guarantees (i.e. serializability at the
> updates and read committed at the external queries) to no isolation
> guarantees. That's not something to be taken lightly. I think that these
> guarantees would be more easily provided for inside queries that modify
> (setKvState), but that's still not trivial.
>
> Indeed, the iteration approach works better for the use-cases I mentioned,
> at least for now.
>
> Cheers,
> Gabor
>
>
> On 2017-02-14 14:43, Ufuk Celebi wrote:
>
> Hey Gabor,
>>
>> great ideas here. It's only slightly related, but I'm currently working
>> on a proposal to improve the queryable state APIs for lookups (partly along
>> the lines of what you suggested with higher level accessors). Maybe you are
>> interested in contributing there?
>>
>> I really like your ideas for the use cases you describe, but I'm unsure
>> about the write path (setKvState), because of the discussed implications to
>> the state backends. I think that this needs more discussion and
>> coordination with the contributors working on the backends. For example,
>> one assumption so far was that only a single thread updates state and we
>> don't scope state per checkpoint (to provide "isolation levels" for the
>> queries; read comitted vs. read uncommitted) and probably more.
>>
>> Because of this I would actually lean towards the iteration approach in a
>> first version. Would that be a feasible starting point for you?
>>
>> – Ufuk
>>
>> On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com)
>> wrote:
>>
>>> Hi Gyula, Jinkui Shi,
>>>   Thanks for your thoughts!
>>>   @Gyula: I'll try and explain a bit more detail.
>>>   The API could be almost like the QueryableState's. It could be
>>> higher-level though: returning Java objects instead of serialized data
>>> (because there would not be issues with class loading). Also, it could
>>> support setKvState (see my 5. point). This could lead to both a
>>> potential performance improvements and easier usage (see my points 2.
>>> and 3.).
>>>   A use-case could be anything where we'd use an external KV-store.
>>> For instance we are updating user states based on another user state, so
>>> in the map function we do a query (in pseudo-ish Scala code):
>>>   users.keyBy(0).flatMapWithState { (userEvent, collector) =>
>>> val thisUser: UserState = state.get()
>>> val otherUser: Future[UserState] =
>>> qsClient.getKvState("users", userEvent.otherUserId)
>>>   otherUser.onSuccess { case otherUserState =>
>>> state.update(someFunc1(thisUser, otherUserState))
>>> collector.collect(someFunc2(thisUser, otherUserState))
>>> }
>>> }
>>>   Another example could be (online) distributed matrix factorization,
>>> where the two factor matrices are represented by distributed states. One
>>> is 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann

Hey Ufuk,

I'm happy to contribute. At least I'll get a bit more understanding of 
the details.


Breaking the assumption that only a single thread updates state would 
brings us from strong isolation guarantees (i.e. serializability at the 
updates and read committed at the external queries) to no isolation 
guarantees. That's not something to be taken lightly. I think that these 
guarantees would be more easily provided for inside queries that modify 
(setKvState), but that's still not trivial.


Indeed, the iteration approach works better for the use-cases I 
mentioned, at least for now.


Cheers,
Gabor

On 2017-02-14 14:43, Ufuk Celebi wrote:


Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a 
proposal to improve the queryable state APIs for lookups (partly along the 
lines of what you suggested with higher level accessors). Maybe you are 
interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about the write 
path (setKvState), because of the discussed implications to the state backends. I think 
that this needs more discussion and coordination with the contributors working on the 
backends. For example, one assumption so far was that only a single thread updates state 
and we don't scope state per checkpoint (to provide "isolation levels" for the 
queries; read comitted vs. read uncommitted) and probably more.

Because of this I would actually lean towards the iteration approach in a first 
version. Would that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote:

Hi Gyula, Jinkui Shi,
  
Thanks for your thoughts!
  
@Gyula: I'll try and explain a bit more detail.
  
The API could be almost like the QueryableState's. It could be

higher-level though: returning Java objects instead of serialized data
(because there would not be issues with class loading). Also, it could
support setKvState (see my 5. point). This could lead to both a
potential performance improvements and easier usage (see my points 2.
and 3.).
  
A use-case could be anything where we'd use an external KV-store.

For instance we are updating user states based on another user state, so
in the map function we do a query (in pseudo-ish Scala code):
  
users.keyBy(0).flatMapWithState { (userEvent, collector) =>

val thisUser: UserState = state.get()
val otherUser: Future[UserState] =
qsClient.getKvState("users", userEvent.otherUserId)
  
otherUser.onSuccess { case otherUserState =>

state.update(someFunc1(thisUser, otherUserState))
collector.collect(someFunc2(thisUser, otherUserState))
}
}
  
Another example could be (online) distributed matrix factorization,

where the two factor matrices are represented by distributed states. One
is updated by querying the other (with getKvState), and computing some
function (i.e. SGD), while the other is updated at the same place (with
setKvState).
  
I see the motivation behind the QueryableState as a way to make further

use of the KV-store we practically have at stateful operators (but
please correct me if I'm wrong). I think we could make even more use of
if the KV-store is used inside the same job.
  
1) Order and timeliness

As you've mentioned it's hard to guarantee any ordering when working
with two states on possibly distinct machines. This could bring us to
distributed transaction processing, what's a complex topic in itself. I
can imagine using watermarks and keeping different state versions to
only allow querying state from the past, and not from the future. For
now, let's just assume that order does not matter.
  
2) Fault-tolerance

There might be other things we could do, but there are two simple
guarantees that we can surely provide. First, by using the current
QueryableState the task could fail with incomplete futures. If the
records producing those futures are received before the previous
checkpoint barrier, those updates will be completely lost. We could
solve this by wait for the futures to complete before starting a
checkpoint, thus providing exactly-once guarantees. This would ensure
that, although the UDF has side-effects, every record has its effect
exactly-once. I don't see a good way to provide this guarantee with the
current QueryableState. Second, we can guarantee that the query will
eventually succeed (or else the whole topology would fail).
  
3) Optimizations

I've also got two examples for optimizations. First, we can do a
best-effort to co-locate the two stateful operators to save on network
overhead. The user could try to co-locate the querying machines when
externally querying the state, but could not move the machines with the
state being queried. Second, we could provide a user-interface for
(loose) guarantees on the latency of sending and returning queries, just
like setting the buffer timeout.
  
4) Concurrent reading/writing

Key-value states and collectors might be accessed concurrently. While
the 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Ufuk Celebi
Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a 
proposal to improve the queryable state APIs for lookups (partly along the 
lines of what you suggested with higher level accessors). Maybe you are 
interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about 
the write path (setKvState), because of the discussed implications to the state 
backends. I think that this needs more discussion and coordination with the 
contributors working on the backends. For example, one assumption so far was 
that only a single thread updates state and we don't scope state per checkpoint 
(to provide "isolation levels" for the queries; read comitted vs. read 
uncommitted) and probably more. 

Because of this I would actually lean towards the iteration approach in a first 
version. Would that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote:
> Hi Gyula, Jinkui Shi,
>  
> Thanks for your thoughts!
>  
> @Gyula: I'll try and explain a bit more detail.
>  
> The API could be almost like the QueryableState's. It could be
> higher-level though: returning Java objects instead of serialized data
> (because there would not be issues with class loading). Also, it could
> support setKvState (see my 5. point). This could lead to both a
> potential performance improvements and easier usage (see my points 2.
> and 3.).
>  
> A use-case could be anything where we'd use an external KV-store.
> For instance we are updating user states based on another user state, so
> in the map function we do a query (in pseudo-ish Scala code):
>  
> users.keyBy(0).flatMapWithState { (userEvent, collector) =>
> val thisUser: UserState = state.get()
> val otherUser: Future[UserState] =
> qsClient.getKvState("users", userEvent.otherUserId)
>  
> otherUser.onSuccess { case otherUserState =>
> state.update(someFunc1(thisUser, otherUserState))
> collector.collect(someFunc2(thisUser, otherUserState))
> }
> }
>  
> Another example could be (online) distributed matrix factorization,
> where the two factor matrices are represented by distributed states. One
> is updated by querying the other (with getKvState), and computing some
> function (i.e. SGD), while the other is updated at the same place (with
> setKvState).
>  
> I see the motivation behind the QueryableState as a way to make further
> use of the KV-store we practically have at stateful operators (but
> please correct me if I'm wrong). I think we could make even more use of
> if the KV-store is used inside the same job.
>  
> 1) Order and timeliness
> As you've mentioned it's hard to guarantee any ordering when working
> with two states on possibly distinct machines. This could bring us to
> distributed transaction processing, what's a complex topic in itself. I
> can imagine using watermarks and keeping different state versions to
> only allow querying state from the past, and not from the future. For
> now, let's just assume that order does not matter.
>  
> 2) Fault-tolerance
> There might be other things we could do, but there are two simple
> guarantees that we can surely provide. First, by using the current
> QueryableState the task could fail with incomplete futures. If the
> records producing those futures are received before the previous
> checkpoint barrier, those updates will be completely lost. We could
> solve this by wait for the futures to complete before starting a
> checkpoint, thus providing exactly-once guarantees. This would ensure
> that, although the UDF has side-effects, every record has its effect
> exactly-once. I don't see a good way to provide this guarantee with the
> current QueryableState. Second, we can guarantee that the query will
> eventually succeed (or else the whole topology would fail).
>  
> 3) Optimizations
> I've also got two examples for optimizations. First, we can do a
> best-effort to co-locate the two stateful operators to save on network
> overhead. The user could try to co-locate the querying machines when
> externally querying the state, but could not move the machines with the
> state being queried. Second, we could provide a user-interface for
> (loose) guarantees on the latency of sending and returning queries, just
> like setting the buffer timeout.
>  
> 4) Concurrent reading/writing
> Key-value states and collectors might be accessed concurrently. While
> the user could use locks, we the system handle this instead of the user.
> E.g. using a thread-safe collector whenever we see internal KV-state
> query registered at the UDF.
>  
> 5) setKvState
> We could not give exactly-once guarantees if we allowed external queries
> to modify the state. When a Flink topology fails and restarts the
> modifications coming from the outside would not be replayed. However, we
> can simply give exactly-once guarantees if the modifications are done
> inside (set aside ordering), as the records would be 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann

Hi Gyula, Jinkui Shi,

Thanks for your thoughts!

@Gyula: I'll try and explain a bit more detail.

The API could be almost like the QueryableState's. It could be 
higher-level though: returning Java objects instead of serialized data 
(because there would not be issues with class loading). Also, it could 
support setKvState (see my 5. point). This could lead to both a 
potential performance improvements and easier usage (see my points 2. 
and 3.).


A use-case could be anything where we'd use an external KV-store.
For instance we are updating user states based on another user state, so 
in the map function we do a query (in pseudo-ish Scala code):


users.keyBy(0).flatMapWithState { (userEvent, collector) =>
  val thisUser: UserState = state.get()
  val otherUser: Future[UserState] =
qsClient.getKvState("users", userEvent.otherUserId)

  otherUser.onSuccess { case otherUserState =>
state.update(someFunc1(thisUser, otherUserState))
collector.collect(someFunc2(thisUser, otherUserState))
  }
}

Another example could be (online) distributed matrix factorization, 
where the two factor matrices are represented by distributed states. One 
is updated by querying the other (with getKvState), and computing some 
function (i.e. SGD), while the other is updated at the same place (with 
setKvState).


I see the motivation behind the QueryableState as a way to make further 
use of the KV-store we practically have at stateful operators (but 
please correct me if I'm wrong). I think we could make even more use of 
if the KV-store is used inside the same job.


1) Order and timeliness
As you've mentioned it's hard to guarantee any ordering when working 
with two states on possibly distinct machines. This could bring us to 
distributed transaction processing, what's a complex topic in itself. I 
can imagine using watermarks and keeping different state versions to 
only allow querying state from the past, and not from the future. For 
now, let's just assume that order does not matter.


2) Fault-tolerance
There might be other things we could do, but there are two simple 
guarantees that we can surely provide. First, by using the current 
QueryableState the task could fail with incomplete futures. If the 
records producing those futures are received before the previous 
checkpoint barrier, those updates will be completely lost. We could 
solve this by wait for the futures to complete before starting a 
checkpoint, thus providing exactly-once guarantees. This would ensure 
that, although the UDF has side-effects, every record has its effect 
exactly-once. I don't see a good way to provide this guarantee with the 
current QueryableState. Second, we can guarantee that the query will 
eventually succeed (or else the whole topology would fail).


3) Optimizations
I've also got two examples for optimizations. First, we can do a 
best-effort to co-locate the two stateful operators to save on network 
overhead. The user could try to co-locate the querying machines when 
externally querying the state, but could not move the machines with the 
state being queried. Second, we could provide a user-interface for 
(loose) guarantees on the latency of sending and returning queries, just 
like setting the buffer timeout.


4) Concurrent reading/writing
Key-value states and collectors might be accessed concurrently. While 
the user could use locks, we the system handle this instead of the user. 
E.g. using a thread-safe collector whenever we see internal KV-state 
query registered at the UDF.


5) setKvState
We could not give exactly-once guarantees if we allowed external queries 
to modify the state. When a Flink topology fails and restarts the 
modifications coming from the outside would not be replayed. However, we 
can simply give exactly-once guarantees if the modifications are done 
inside (set aside ordering), as the records would be replayed if the 
modification failed.


I believe it would not take much effort to do these improvements. 
Although, it would affect the runtime (integration with FT), and it 
might not be worth working towards these goals. What do you think about 
this?


It's also worth considering that the same use-cases could be similarly 
done with the iteration/loops API, but in a bit less natural way, 
imitating two-direction communication.


Should we move this discussion to a JIRA issue, to avoid flooding the 
mailing list?



@Jinkui Shi:
1. I think we should definitely support a flexible update strategy. I.e. 
allow to choose between sync, async and bounded-delay.
2. I really like your idea of using PS externally and connecting to it 
with a source and a sink. Fault-tolerance could be also be achieved by 
versioning at the PS and resending older parameters if the Flink job 
fails (i.e. making PS a durable source). Although, the question is then 
how to implement the PS? Do you think we could use the implementations 
you've mentioned?
3. Good idea. It's been just proposed to support GPU calculations 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-12 Thread Jinkui Shi
hi,Gábor Hermann

The online parameter server is a good proposal. 
PS’ paper [1] have a early implement [2], and now it’s mxnet [3].
I have some thought about online PS in Flink:
1.  Whether support flexible and configurable update strategy?
For example, in one iteration, computing serval times updating once or 
update every time of iteration.
2.  Whether we implement is fully based on the DAG, having not too much 
modify the runtime and core?
-   The parameter server is the Source with distributed parameter 
data, called PS.
-   The worker nodes are the DAG except the Source. That is some ML 
arithmetic implemented using Flink API.
-   Multiple layer computing’s result flow to the Sink operator 
naturally
-   Sink can feedback to the Source for next iteration
-   Atomic tuning the busy operator, increase/decrease the compute 
resource(the max parallelism) of the runtime operators.
3.  Atomically detect GPU supporting provided by Mesos, and use it if 
enable configuration of using GPU.

[1] https://www.cs.cmu.edu/~muli/file/ps.pdf 

[2] https://github.com/dmlc/parameter_server 

[3] https://github.com/dmlc/mxnet 

> On Feb 12, 2017, at 00:54, Gyula Fóra  wrote:
> 
> Hi Gábor,
> 
> I think the general idea is very nice, but it would nice to see clearer
> what benefit does this bring from the developers perspective. Maybe rough
> API sketch and 1-2 examples.
> 
> I am wondering what sort of consistency guarantees do you imagine for such
> operations, or why the fault tolerance is even relevant. Are you thinking
> about an asynchronous API such as querying the state for another key might
> give you a Future that is guaranteed to complete eventually.
> 
> It seems to be hard to guarantee the timeliness (order) of these operations
> with respect to the updates made to the states, so I wonder if there is
> benefit of doing this compared to using the Queryable state interface. Is
> this only a potential performance improvement or is it easier to work with
> this?
> 
> Cheers,
> Gyula
> 
> Gábor Hermann  ezt írta (időpont: 2017. febr. 10.,
> P, 16:01):
> 
>> Hi all,
>> 
>> TL;DR: Is it worth to implement a special QueryableState for querying
>> state from another part of a Flink streaming job and aligning it with
>> fault tolerance?
>> 
>> I've been thinking about implementing a Parameter Server with/within
>> Flink. A Parameter Server is basically a specialized key-value store
>> optimized for training distributed machine learning models. So not only
>> the training data, but also the model is distributed. Range queries are
>> typical, and usually vectors and matrices are stored as values.
>> 
>> More generally, an integrated key-value store might also be useful in
>> the Streaming API. Although external key-value stores can be used inside
>> UDFs for the same purpose, aligning them with the fault tolerance
>> mechanism of Flink could be hard. What if state distributed by a key (in
>> the current Streaming API) could be queried from another operator? Much
>> like QueryableState, but querying *inside* the Flink job. We could make
>> use of the fact that state has been queried from inside to optimize
>> communication and integrate fault tolerance.
>> 
>> The question is whether the Flink community would like such feature, and
>> if so how to do it?
>> 
>> I could elaborate my ideas if needed, and I'm happy to create a design
>> doc, but before that, I'd like to know what you all think about this.
>> Also, I don't know if I'm missing something, so please correct me. Here
>> are some quick notes regarding the integrated KV-store:
>> 
>> Pros
>> - It could allow easier implementation of more complicated use-cases.
>> E.g. updating users preferences simultaneously based on each others
>> preferences when events happen between them such as making a connection,
>> liking each other posts, or going to the same concert. User preferences
>> are distributed as a state, an event about user A liking user B gets
>> sent to A's state and queries the state of B, then updates the state of
>> B. There have been questions on the user mailing list for similar
>> problems [1].
>> - Integration with fault tolerance. User does not have to maintain two
>> systems consistently.
>> - Optimization potentials. At the social network example maybe other
>> users on the same partitions with user A need the state of user B, so we
>> don't have to send around user B twice.
>> - Could specialize to a Parameter Server for simple (and efficient)
>> implementation of (possibly online) machine learning. E.g. sparse
>> logistic regression, LDA, matrix factorization for recommendation systems.
>> 
>> Cons
>> - Lot of development effort.
>> - "Client-server" architecture goes against the DAG dataflow model.
>> 
>> Two 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-11 Thread Gyula Fóra
Hi Gábor,

I think the general idea is very nice, but it would nice to see clearer
what benefit does this bring from the developers perspective. Maybe rough
API sketch and 1-2 examples.

I am wondering what sort of consistency guarantees do you imagine for such
operations, or why the fault tolerance is even relevant. Are you thinking
about an asynchronous API such as querying the state for another key might
give you a Future that is guaranteed to complete eventually.

It seems to be hard to guarantee the timeliness (order) of these operations
with respect to the updates made to the states, so I wonder if there is
benefit of doing this compared to using the Queryable state interface. Is
this only a potential performance improvement or is it easier to work with
this?

Cheers,
Gyula

Gábor Hermann  ezt írta (időpont: 2017. febr. 10.,
P, 16:01):

> Hi all,
>
> TL;DR: Is it worth to implement a special QueryableState for querying
> state from another part of a Flink streaming job and aligning it with
> fault tolerance?
>
> I've been thinking about implementing a Parameter Server with/within
> Flink. A Parameter Server is basically a specialized key-value store
> optimized for training distributed machine learning models. So not only
> the training data, but also the model is distributed. Range queries are
> typical, and usually vectors and matrices are stored as values.
>
> More generally, an integrated key-value store might also be useful in
> the Streaming API. Although external key-value stores can be used inside
> UDFs for the same purpose, aligning them with the fault tolerance
> mechanism of Flink could be hard. What if state distributed by a key (in
> the current Streaming API) could be queried from another operator? Much
> like QueryableState, but querying *inside* the Flink job. We could make
> use of the fact that state has been queried from inside to optimize
> communication and integrate fault tolerance.
>
> The question is whether the Flink community would like such feature, and
> if so how to do it?
>
> I could elaborate my ideas if needed, and I'm happy to create a design
> doc, but before that, I'd like to know what you all think about this.
> Also, I don't know if I'm missing something, so please correct me. Here
> are some quick notes regarding the integrated KV-store:
>
> Pros
> - It could allow easier implementation of more complicated use-cases.
> E.g. updating users preferences simultaneously based on each others
> preferences when events happen between them such as making a connection,
> liking each other posts, or going to the same concert. User preferences
> are distributed as a state, an event about user A liking user B gets
> sent to A's state and queries the state of B, then updates the state of
> B. There have been questions on the user mailing list for similar
> problems [1].
> - Integration with fault tolerance. User does not have to maintain two
> systems consistently.
> - Optimization potentials. At the social network example maybe other
> users on the same partitions with user A need the state of user B, so we
> don't have to send around user B twice.
> - Could specialize to a Parameter Server for simple (and efficient)
> implementation of (possibly online) machine learning. E.g. sparse
> logistic regression, LDA, matrix factorization for recommendation systems.
>
> Cons
> - Lot of development effort.
> - "Client-server" architecture goes against the DAG dataflow model.
>
> Two approaches for the implementation in the streaming API:
>
> 1) An easy way to implement this is to use iterations (or the proposed
> loops API). We can achieve two-way communication by two operators in a
> loop: a worker (W) and a Parameter Server (PS), see the diagram [2]. (An
> additional nested loop in the PS could add replication opportunities).
> Then we would get fault tolerance "for free" by the work of Paris [3].
> It would also be on top of the Streaming API, with no effect on the
> runtime.
>
> 2) A problem with the loop approach is that coordination between PS
> nodes and worker nodes can only be done on the data stream. We could not
> really use e.g. Akka for async coordination. A harder but more flexible
> way is to use lower-level interfaces of Flink and touch the runtime.
> Then we would have to take care of fault tolerance too.
>
> (As a side note: in the batch API generalizing delta iterations could be
> a solution for Parameter Server [4].)
>
> Thanks for any feedback :)
>
> Cheers,
> Gabor
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sharded-state-2-step-operation-td8631.html
> [2] https://i.imgur.com/GsliUIh.png
> [3] https://github.com/apache/flink/pull/1668
> [4]
>
> https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran
>
>