I've tried to put the current design into code. Any feedback appreciated for
these changes to enable caching of user state:
Proto: https://github.com/apache/beam/pull/9440
Runner: https://github.com/apache/beam/pull/9374
Python SDK: https://github.com/apache/beam/pull/9418
Thanks,
Max
On 28.08
Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?
We could call them version identifiers, though cache tokens were always
a
cachetools sounds like a fine choice to me.
For the first version I've implemented a simple LRU cache. If you want
to have a look:
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60
Open up a PR for the proto changes and we can work through any minor com
Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?
On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels wrote:
>
> Thanks. Upda
On Sun, Aug 18, 2019 at 7:30 PM Rakesh Kumar wrote:
>
> not to completely hijack Max's question but a tangential question regarding
> LRU cache.
>
> What is the preferred python library for LRU cache?
> I noticed that cachetools [1] is used as one of the dependencies for GCP [2].
> Cachetools[1]
Open up a PR for the proto changes and we can work through any minor
comments there.
On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels wrote:
> Thanks. Updated:
>
> message ProcessBundleRequest {
> // (Required) A reference to the process bundle descriptor that must be //
> instantiated an
Thanks. Updated:
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness. string process_bundle_descriptor_reference =1;
// A cache token which can be used by an SDK to check for the validity
//
SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.
On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels wrote:
> We would have to differentiate cache tokens for user state and side
> inputs. How about something like this?
>
> message
We would have to differentiate cache tokens for user state and side
inputs. How about something like this?
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness. string process_bundle_descriptor_r
The bundles view of side inputs should never change during processing and
should have a point in time snapshot.
I was just trying to say that the cache token for side inputs being
deferred till side input request time simplified the runners implementation
since that is conclusively when the runner
Thanks for the quick response.
Just to clarify, the issue with versioning side input is also present
when supplying the cache tokens on a request basis instead of per
bundle. The SDK never knows when the Runner receives a new version of
the side input. Like you pointed out, it needs to mark si
Your summary below makes sense to me. I can see that recovery from rolling
back doesn't need to be a priority and simplifies the solution for user
state caching down to one token.
Providing cache tokens upfront does require the Runner to know what
"version" of everything it may supply to the SDK u
Thank you for the summary Luke. I really appreciate the effort you put
into this!
> Based upon your discussion you seem to want option #1
I'm actually for option #2. The option to cache/invalidate side inputs
is important, and we should incorporate this in the design. That's why
option #1 is not
There were originally a couple of ideas around how caching could work:
1) One cache token for the entire bundle that is supplied up front. The SDK
caches everything using the given token. All reads/clear/append for all
types of state happen under this token. Anytime a side input changes, key
proces
Just to give a quick update here. Rakesh, Thomas, and I had a discussion
about async writes from the Python SDK to the Runner. Robert was also
present for some parts of the discussion.
We concluded that blocking writes with the need to refresh the cache
token each time are not going to provide eno
> There is probably a misunderstanding here: I'm suggesting to use a worker ID
> instead of cache tokens, not additionally.
Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a ne
> There is probably a misunderstanding here: I'm suggesting to use a
worker ID instead of cache tokens, not additionally.
Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new u
On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels wrote:
> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I wa
-->
On Wed, Aug 21, 2019, 2:16 AM Maximilian Michels wrote:
> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I
Appreciate all your comments! Replying below.
@Luke:
> Having cache tokens per key would be very expensive indeed and I believe we
> should go with a single cache token "per" bundle.
Thanks for your comments on the PR. I was thinking to propose something
along this lines of having cache tokens
Dataflow does something like this, however since work is
load balanced across workers a per-worker id doesn't work very well.
Dataflow divides the keyspace up into lexicographic ranges, and creates a
cache token per range.
On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise wrote:
> Commenting here vs.
Commenting here vs. on the PR since related to the overall approach.
Wouldn't it be simpler to have the runner just track a unique ID for each
worker and use that to communicate if the cache is valid or not?
* When the bundle is started, the runner tells the worker if the cache has
become invalid
Thanks Luke!
On the note of cache tokens, do we have an idea how cache tokens are
generated and managed by the Runner?
In my mind we will maintain a list of cache tokens scoped by state id
and SDK worker. Cache tokens will not be checkpointed which means
long-running SDK workers will have to requ
For the purpose of my own understanding of the matter, I've created a
document:
https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/
It could make sense to clarify and specify things in there for now. I'm
more than willing to consolidate this document with the caching
Yes, that makes sense. What do you think about creating a document to
summarize the ideas presented here? Also, it would be good to capture
the status quo regarding caching in the Python SDK.
-Max
On 13.08.19 22:44, Thomas Weise wrote:
> The token would be needed in general to invalidate the cach
The token would be needed in general to invalidate the cache when bundles
are processed by different workers.
In the case of the Flink runner we don't have a scenario of SDK worker
surviving the runner in the case of a failure, so there is no possibility
of inconsistent state as result of a checkp
Thanks for clarifying. Cache-invalidation for side inputs makes sense.
In case the Runner fails to checkpoint, could it not re-attempt the
checkpoint? At least in the case of Flink, the cache would still be
valid until another checkpoint is attempted. For other Runners that may
not be the case. Al
On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels wrote:
> Agree that we have to be able to flush before a checkpoint to avoid
> caching too many elements. Also good point about checkpoint costs
> increasing with flushing the cache on checkpoints. A LRU cache policy in
> the SDK seems desirable.
Agree that we have to be able to flush before a checkpoint to avoid
caching too many elements. Also good point about checkpoint costs
increasing with flushing the cache on checkpoints. A LRU cache policy in
the SDK seems desirable.
What is the role of the cache token in the design document[1]? It
On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise wrote:
>
> On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels wrote:
>
>> Thanks for starting this discussion Rakesh. An efficient cache layer is
>> one of the missing pieces for good performance in stateful pipelines.
>> The good news are that there
On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels wrote:
> Thanks for starting this discussion Rakesh. An efficient cache layer is
> one of the missing pieces for good performance in stateful pipelines.
> The good news are that there is a level of caching already present in
> Python which batche
Thanks for starting this discussion Rakesh. An efficient cache layer is
one of the missing pieces for good performance in stateful pipelines.
The good news are that there is a level of caching already present in
Python which batches append requests until the bundle is finished.
Thomas, in your exa
On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw wrote:
> The question is whether the SDK needs to wait for the StateResponse to
> come back before declaring the bundle done. The proposal was to not
> send the cache token back as part of an append StateResponse [1], but
> pre-provide it as part of
The question is whether the SDK needs to wait for the StateResponse to
come back before declaring the bundle done. The proposal was to not
send the cache token back as part of an append StateResponse [1], but
pre-provide it as part of the bundle request.
Thinking about this some more, if we assume
The purpose of the new state API call in BEAM-7000 is to tell the runner
that the SDK is now blocked waiting for the result of a specific state
request and it should be used for fetches (not updates) and is there to
allow for SDKs to differentiate readLater (I will need this data at some
point in t
On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise wrote:
>
> That would add a synchronization point that forces extra latency especially
> in streaming mode.
>
> Wouldn't it be possible for the runner to assign the token when starting the
> bundle and for the SDK to pass it along the state requests?
That would add a synchronization point that forces extra latency especially
in streaming mode.
Wouldn't it be possible for the runner to assign the token when starting
the bundle and for the SDK to pass it along the state requests? That way,
there would be no need to batch and wait for a flush.
I believe the intent is to add a new state API call telling the runner that
it is blocked waiting for a response (BEAM-7000).
This should allow the runner to wait till it sees one of these I'm blocked
requests and then merge + batch any state calls it may have at that point
in time allowing it to
Now I see what you mean.
On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise wrote:
> Hi Luke,
>
> I guess the answer is that it depends on the state backend. If a set
> operation in the state backend is available that is more efficient than
> clear+append, then it would be beneficial to have a dedicate
Hi Luke,
I guess the answer is that it depends on the state backend. If a set
operation in the state backend is available that is more efficient than
clear+append, then it would be beneficial to have a dedicated fn api
operation to allow for such optimization. That's something that needs to be
det
Thomas, why do you think a single round trip is needed?
clear + append can be done blindly from the SDK side and it has total
knowledge of the state at that point in time till the end of the bundle at
which point you want to wait to get the cache token back from the runner
for the append call so t
Hi Rakesh,
Glad to see you pointer this problem out!
+1 for add this implementation. Manage State by write-through-cache is
pretty important for Streaming job!
Best, Jincheng
Thomas Weise 于2019年7月29日周一 下午8:54写道:
> FYI a basic test appears to confirm the importance of the cross-bundle
> caching
FYI a basic test appears to confirm the importance of the cross-bundle
caching: I found that the throughput can be increased by playing with the
bundle size in the Flink runner. Default caps at 1000 elements (or 1
second). So on a high throughput stream the bundles would be capped by the
count limi
On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar wrote:
>
> Thanks Robert,
>
> I stumble on the jira that you have created some time ago
> https://jira.apache.org/jira/browse/BEAM-5428
>
> You also marked code where code changes are required:
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c
Thanks Robert,
I stumble on the jira that you have created some time ago
https://jira.apache.org/jira/browse/BEAM-5428
You also marked code where code changes are required:
https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_
This is documented at
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
. Note that it requires participation of both the runner and the SDK
(though there are no correctness issues if one or the other side does
not understand the protocol, c
I checked the python sdk[1] and it has similar implementation as Java SDK.
I would agree with Thomas. In case of high volume event stream and bigger
cluster size, network call can potentially cause a bottleneck.
@Robert
I am interested to see the proposal. Can you provide me the link of the
propo
Thanks for the pointer. For streaming, it will be important to support
caching across bundles. It appears that even the Java SDK doesn't support
that yet?
https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDo
User state is built on top of read, append and clear and not off a read and
write paradigm to allow for blind appends.
The optimization you speak of can be done completely inside the SDK without
any additional protocol being required as long as you clear the state first
and then append all your ne
Python workers also have a per-bundle SDK-side cache. A protocol has
been proposed, but hasn't yet been implemented in any SDKs or runners.
On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax wrote:
>
> It's runner dependent. Some runners (e.g. the Dataflow runner) do have such a
> cache, though I think
It's runner dependent. Some runners (e.g. the Dataflow runner) do have such
a cache, though I think it's currently has a cap for large bags.
Reuven
On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar wrote:
> Hi,
>
> I have been using python sdk for the application and also using BagState
> in product
Hi,
I have been using python sdk for the application and also using BagState in
production. I was wondering whether state logic has any write-through-cache
implemented or not. If we are sending every read and write request through
network then it comes with a performance cost. We can avoid network
52 matches
Mail list logo