Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels
I've tried to put the current design into code. Any feedback appreciated for these changes to enable caching of user state: Proto: https://github.com/apache/beam/pull/9440 Runner: https://github.com/apache/beam/pull/9374 Python SDK: https://github.com/apache/beam/pull/9418 Thanks, Max On 28.08

Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels
Just to clarify, the repeated list of cache tokens in the process bundle request is used to validate reading *and* stored when writing? In that sense, should they just be called version identifiers or something like that? We could call them version identifiers, though cache tokens were always a

Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels
cachetools sounds like a fine choice to me. For the first version I've implemented a simple LRU cache. If you want to have a look: https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60 Open up a PR for the proto changes and we can work through any minor com

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
Just to clarify, the repeated list of cache tokens in the process bundle request is used to validate reading *and* stored when writing? In that sense, should they just be called version identifiers or something like that? On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels wrote: > > Thanks. Upda

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
On Sun, Aug 18, 2019 at 7:30 PM Rakesh Kumar wrote: > > not to completely hijack Max's question but a tangential question regarding > LRU cache. > > What is the preferred python library for LRU cache? > I noticed that cachetools [1] is used as one of the dependencies for GCP [2]. > Cachetools[1]

Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
Open up a PR for the proto changes and we can work through any minor comments there. On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels wrote: > Thanks. Updated: > > message ProcessBundleRequest { > // (Required) A reference to the process bundle descriptor that must be // > instantiated an

Re: Write-through-cache in State logic

2019-08-27 Thread Maximilian Michels
Thanks. Updated: message ProcessBundleRequest { // (Required) A reference to the process bundle descriptor that must be // instantiated and executed by the SDK harness. string process_bundle_descriptor_reference =1; // A cache token which can be used by an SDK to check for the validity //

Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
SideInputState -> SideInput (side_input_state -> side_input) + more comments around the messages and the fields. On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels wrote: > We would have to differentiate cache tokens for user state and side > inputs. How about something like this? > > message

Re: Write-through-cache in State logic

2019-08-27 Thread Maximilian Michels
We would have to differentiate cache tokens for user state and side inputs. How about something like this? message ProcessBundleRequest { // (Required) A reference to the process bundle descriptor that must be // instantiated and executed by the SDK harness. string process_bundle_descriptor_r

Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
The bundles view of side inputs should never change during processing and should have a point in time snapshot. I was just trying to say that the cache token for side inputs being deferred till side input request time simplified the runners implementation since that is conclusively when the runner

Re: Write-through-cache in State logic

2019-08-27 Thread Maximilian Michels
Thanks for the quick response. Just to clarify, the issue with versioning side input is also present when supplying the cache tokens on a request basis instead of per bundle. The SDK never knows when the Runner receives a new version of the side input. Like you pointed out, it needs to mark si

Re: Write-through-cache in State logic

2019-08-26 Thread Lukasz Cwik
Your summary below makes sense to me. I can see that recovery from rolling back doesn't need to be a priority and simplifies the solution for user state caching down to one token. Providing cache tokens upfront does require the Runner to know what "version" of everything it may supply to the SDK u

Re: Write-through-cache in State logic

2019-08-26 Thread Maximilian Michels
Thank you for the summary Luke. I really appreciate the effort you put into this! > Based upon your discussion you seem to want option #1 I'm actually for option #2. The option to cache/invalidate side inputs is important, and we should incorporate this in the design. That's why option #1 is not

Re: Write-through-cache in State logic

2019-08-26 Thread Lukasz Cwik
There were originally a couple of ideas around how caching could work: 1) One cache token for the entire bundle that is supplied up front. The SDK caches everything using the given token. All reads/clear/append for all types of state happen under this token. Anytime a side input changes, key proces

Re: Write-through-cache in State logic

2019-08-22 Thread Maximilian Michels
Just to give a quick update here. Rakesh, Thomas, and I had a discussion about async writes from the Python SDK to the Runner. Robert was also present for some parts of the discussion. We concluded that blocking writes with the need to refresh the cache token each time are not going to provide eno

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a worker ID > instead of cache tokens, not additionally. Ah! Misread that. We need a changing token to indicate that the cache is stale, e.g. checkpoint has failed / restoring from an old checkpoint. If the _Runner_ generates a ne

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a worker ID instead of cache tokens, not additionally. Ah! Misread that. We need a changing token to indicate that the cache is stale, e.g. checkpoint has failed / restoring from an old checkpoint. If the _Runner_ generates a new u

Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels wrote: > Appreciate all your comments! Replying below. > > > @Luke: > > > Having cache tokens per key would be very expensive indeed and I believe > we should go with a single cache token "per" bundle. > > Thanks for your comments on the PR. I wa

Re: Write-through-cache in State logic

2019-08-21 Thread Thomas Weise
--> On Wed, Aug 21, 2019, 2:16 AM Maximilian Michels wrote: > Appreciate all your comments! Replying below. > > > @Luke: > > > Having cache tokens per key would be very expensive indeed and I believe > we should go with a single cache token "per" bundle. > > Thanks for your comments on the PR. I

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
Appreciate all your comments! Replying below. @Luke: > Having cache tokens per key would be very expensive indeed and I believe we > should go with a single cache token "per" bundle. Thanks for your comments on the PR. I was thinking to propose something along this lines of having cache tokens

Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
Dataflow does something like this, however since work is load balanced across workers a per-worker id doesn't work very well. Dataflow divides the keyspace up into lexicographic ranges, and creates a cache token per range. On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise wrote: > Commenting here vs.

Re: Write-through-cache in State logic

2019-08-20 Thread Thomas Weise
Commenting here vs. on the PR since related to the overall approach. Wouldn't it be simpler to have the runner just track a unique ID for each worker and use that to communicate if the cache is valid or not? * When the bundle is started, the runner tells the worker if the cache has become invalid

Re: Write-through-cache in State logic

2019-08-16 Thread Maximilian Michels
Thanks Luke! On the note of cache tokens, do we have an idea how cache tokens are generated and managed by the Runner? In my mind we will maintain a list of cache tokens scoped by state id and SDK worker. Cache tokens will not be checkpointed which means long-running SDK workers will have to requ

Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
For the purpose of my own understanding of the matter, I've created a document: https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/ It could make sense to clarify and specify things in there for now. I'm more than willing to consolidate this document with the caching

Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
Yes, that makes sense. What do you think about creating a document to summarize the ideas presented here? Also, it would be good to capture the status quo regarding caching in the Python SDK. -Max On 13.08.19 22:44, Thomas Weise wrote: > The token would be needed in general to invalidate the cach

Re: Write-through-cache in State logic

2019-08-13 Thread Thomas Weise
The token would be needed in general to invalidate the cache when bundles are processed by different workers. In the case of the Flink runner we don't have a scenario of SDK worker surviving the runner in the case of a failure, so there is no possibility of inconsistent state as result of a checkp

Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
Thanks for clarifying. Cache-invalidation for side inputs makes sense. In case the Runner fails to checkpoint, could it not re-attempt the checkpoint? At least in the case of Flink, the cache would still be valid until another checkpoint is attempted. For other Runners that may not be the case. Al

Re: Write-through-cache in State logic

2019-08-13 Thread Lukasz Cwik
On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels wrote: > Agree that we have to be able to flush before a checkpoint to avoid > caching too many elements. Also good point about checkpoint costs > increasing with flushing the cache on checkpoints. A LRU cache policy in > the SDK seems desirable.

Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
Agree that we have to be able to flush before a checkpoint to avoid caching too many elements. Also good point about checkpoint costs increasing with flushing the cache on checkpoints. A LRU cache policy in the SDK seems desirable. What is the role of the cache token in the design document[1]? It

Re: Write-through-cache in State logic

2019-08-12 Thread Lukasz Cwik
On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise wrote: > > On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels wrote: > >> Thanks for starting this discussion Rakesh. An efficient cache layer is >> one of the missing pieces for good performance in stateful pipelines. >> The good news are that there

Re: Write-through-cache in State logic

2019-08-12 Thread Thomas Weise
On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels wrote: > Thanks for starting this discussion Rakesh. An efficient cache layer is > one of the missing pieces for good performance in stateful pipelines. > The good news are that there is a level of caching already present in > Python which batche

Re: Write-through-cache in State logic

2019-08-12 Thread Maximilian Michels
Thanks for starting this discussion Rakesh. An efficient cache layer is one of the missing pieces for good performance in stateful pipelines. The good news are that there is a level of caching already present in Python which batches append requests until the bundle is finished. Thomas, in your exa

Re: Write-through-cache in State logic

2019-08-09 Thread Lukasz Cwik
On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw wrote: > The question is whether the SDK needs to wait for the StateResponse to > come back before declaring the bundle done. The proposal was to not > send the cache token back as part of an append StateResponse [1], but > pre-provide it as part of

Re: Write-through-cache in State logic

2019-08-09 Thread Robert Bradshaw
The question is whether the SDK needs to wait for the StateResponse to come back before declaring the bundle done. The proposal was to not send the cache token back as part of an append StateResponse [1], but pre-provide it as part of the bundle request. Thinking about this some more, if we assume

Re: Write-through-cache in State logic

2019-08-08 Thread Lukasz Cwik
The purpose of the new state API call in BEAM-7000 is to tell the runner that the SDK is now blocked waiting for the result of a specific state request and it should be used for fetches (not updates) and is there to allow for SDKs to differentiate readLater (I will need this data at some point in t

Re: Write-through-cache in State logic

2019-08-08 Thread Robert Bradshaw
On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise wrote: > > That would add a synchronization point that forces extra latency especially > in streaming mode. > > Wouldn't it be possible for the runner to assign the token when starting the > bundle and for the SDK to pass it along the state requests?

Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
That would add a synchronization point that forces extra latency especially in streaming mode. Wouldn't it be possible for the runner to assign the token when starting the bundle and for the SDK to pass it along the state requests? That way, there would be no need to batch and wait for a flush.

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
I believe the intent is to add a new state API call telling the runner that it is blocked waiting for a response (BEAM-7000). This should allow the runner to wait till it sees one of these I'm blocked requests and then merge + batch any state calls it may have at that point in time allowing it to

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
Now I see what you mean. On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise wrote: > Hi Luke, > > I guess the answer is that it depends on the state backend. If a set > operation in the state backend is available that is more efficient than > clear+append, then it would be beneficial to have a dedicate

Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
Hi Luke, I guess the answer is that it depends on the state backend. If a set operation in the state backend is available that is more efficient than clear+append, then it would be beneficial to have a dedicated fn api operation to allow for such optimization. That's something that needs to be det

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
Thomas, why do you think a single round trip is needed? clear + append can be done blindly from the SDK side and it has total knowledge of the state at that point in time till the end of the bundle at which point you want to wait to get the cache token back from the runner for the append call so t

Re: Write-through-cache in State logic

2019-07-29 Thread jincheng sun
Hi Rakesh, Glad to see you pointer this problem out! +1 for add this implementation. Manage State by write-through-cache is pretty important for Streaming job! Best, Jincheng Thomas Weise 于2019年7月29日周一 下午8:54写道: > FYI a basic test appears to confirm the importance of the cross-bundle > caching

Re: Write-through-cache in State logic

2019-07-29 Thread Thomas Weise
FYI a basic test appears to confirm the importance of the cross-bundle caching: I found that the throughput can be increased by playing with the bundle size in the Flink runner. Default caps at 1000 elements (or 1 second). So on a high throughput stream the bundles would be capped by the count limi

Re: Write-through-cache in State logic

2019-07-25 Thread Robert Bradshaw
On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar wrote: > > Thanks Robert, > > I stumble on the jira that you have created some time ago > https://jira.apache.org/jira/browse/BEAM-5428 > > You also marked code where code changes are required: > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c

Re: Write-through-cache in State logic

2019-07-23 Thread Rakesh Kumar
Thanks Robert, I stumble on the jira that you have created some time ago https://jira.apache.org/jira/browse/BEAM-5428 You also marked code where code changes are required: https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_

Re: Write-through-cache in State logic

2019-07-23 Thread Robert Bradshaw
This is documented at https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m . Note that it requires participation of both the runner and the SDK (though there are no correctness issues if one or the other side does not understand the protocol, c

Re: Write-through-cache in State logic

2019-07-17 Thread Rakesh Kumar
I checked the python sdk[1] and it has similar implementation as Java SDK. I would agree with Thomas. In case of high volume event stream and bigger cluster size, network call can potentially cause a bottleneck. @Robert I am interested to see the proposal. Can you provide me the link of the propo

Re: Write-through-cache in State logic

2019-07-16 Thread Thomas Weise
Thanks for the pointer. For streaming, it will be important to support caching across bundles. It appears that even the Java SDK doesn't support that yet? https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDo

Re: Write-through-cache in State logic

2019-07-16 Thread Lukasz Cwik
User state is built on top of read, append and clear and not off a read and write paradigm to allow for blind appends. The optimization you speak of can be done completely inside the SDK without any additional protocol being required as long as you clear the state first and then append all your ne

Re: Write-through-cache in State logic

2019-07-16 Thread Robert Bradshaw
Python workers also have a per-bundle SDK-side cache. A protocol has been proposed, but hasn't yet been implemented in any SDKs or runners. On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax wrote: > > It's runner dependent. Some runners (e.g. the Dataflow runner) do have such a > cache, though I think

Re: Write-through-cache in State logic

2019-07-15 Thread Reuven Lax
It's runner dependent. Some runners (e.g. the Dataflow runner) do have such a cache, though I think it's currently has a cap for large bags. Reuven On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar wrote: > Hi, > > I have been using python sdk for the application and also using BagState > in product

Write-through-cache in State logic

2019-07-15 Thread Rakesh Kumar
Hi, I have been using python sdk for the application and also using BagState in production. I was wondering whether state logic has any write-through-cache implemented or not. If we are sending every read and write request through network then it comes with a performance cost. We can avoid network