I am happy to chat about it over hangout or slack too. Let's talk offline
to set it up if needed.
Thanks,
Xinyu
On Tue, May 15, 2018 at 10:51 AM, Xinyu Liu wrote:
> For Samza runner, it's always processes key+window pairs serially. To
> answer Luke's question:
>
> - Why
For Samza runner, it's always processes key+window pairs serially. To
answer Luke's question:
- Why Samza needs to use a snapshot in the first place and should be able
to read data from RocksDb directly?
I believe that the point of adding the readIterator() is to allow us to
read from RocksDb
On Tue, May 15, 2018 at 9:36 AM Lukasz Cwik wrote:
> If it always processes key+window pairs serially, then I'm not sure why
> Samza needs to use a snapshot in the first place and should be able to read
> data from RocksDb directly.
>
> The other point of confusion is around
In that case my confusion lies with how or why adding support for an
iterator helps Samza resolve its memory issues and the consistency
questions are about how a Runner is choosing to parallelize execution of a
straight-line program. For example a runner may choose to use optimistic
locking and
OK, got it. But what consistency are you referring to? I was trying to
point out that there's nothing but straight-line program order consistency.
There's only one actor doing all the reads and all the writes.
Kenn
On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik wrote:
> I
I misspoke when I said portability semantics and should have said
portability design/implementation. This is why I had a follow-up e-mail and
clarified that I'm confused on:
* I don't understand how you would want close to change the semantics of a
user state specification and how it affects the
I feel like this discussion is kind of far from the primary intention. The
point of ParDo(stateful DoFn) is to enable naive single-threaded code in a
style intuitive to a beginning imperative programmer. So:
- the return value of read() should act like an immutable value
- if there is a read
I don't follow why allowing freeing resources would be counter to the spec.
I don't really know what you mean by consistent for a bundle. State, in the
sense of the user-facing per-key-and-window state API, is single threaded
and scoped to a single DoFn. There's no one else who can write the
Hmm, some of the problem I'm dealing with is:
* I don't understand how you would want close to change the semantics of a
user state specification and how it affects the lifetime of user state?
** Does it represent committing information within a bundle?
** Does it mean that user state can ignore
I will take a look at the docs to understand the problem better. A minor
comment to 2) is that I don't intend to change the existing iterable API. I
plan to implement it similar to Flink, loading the data into memory and
closing the underlying snapshot after that. So the changes should be
backward
I believe adding support for a state spec to be 'closed' or 'freed' is
counter to the requirement of a state spec being consistent for the
lifetime of a bundle, are we willing to change this requirement for the
lifetime of a bundle or say that runners can arbitrary say that a StateSpec
can't be
Before you go on and update the user facing API, we should discuss the last
point I made since the change your making will have limited usability since
the portability effort won't realistically allow you to see such low level
things like when processElement finished and supporting user state will
We discussed internally about the proposed approaches. Seems if the State
API can also expose another method to return a ReadableState, it
will cover our cases of iterating over a bigger-then-memory state, and
closing the underlying rocksDb snapshot immediately after the iterator is
fully consumed
On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik wrote:
> Users typically want to do that async operation and then produce output
> with it. Adding asynchronous execution is difficult within the framework
> because a lot of code is currently not needed to be thread safe and writing
4 separate points about discussions above:
Note that the state API requires that the underlying state is snapshotted
during bundle processing, in BEAM-2975 the discussion is whether the
current in memory representation/modifications is directly visible or not
during processElement. It should be
At least one API that has been discussed in the past, is to use Java 8
CompletionStage. e.g.
new DoFn() {
@ProcessElement
public void process(@Element CompletionStage element, ...) {
element.thenApply(...)
}
}
The framework will automatically create the
Thanks for all the pointers. I looked though the discussion over BEAM-2975
and BEAM-2980 about having snapshot or live views of iterable, and the
current semantics makes a lot of sense to me. For your question: it does
not require an explicit snapshot when we create RocksDb iterator directly.
The
I don't have any further suggestions, but want to call out how this hits a
lot of interesting points.
The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980
[2] where we debated things a bit. I think the strongest case is for what
you describe - it should be a snapshot.
Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
here, we discussed internally about releasing the underlying resources
after consuming the whole iterator. This probably covers quite a lot of use
cases. For some special cases that the user only consume part of the
The iterator going out of scope is the idiomatic way that resources are
freed for Java developers (hence the weak/phantom reference suggestion).
Explicitly requiring users to deal with 'handles' (like file streams) lead
to leaked resources.
On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles
Thanks Xinyu,
I actually had first sketched out just what you wrote. But then I realized
a few things:
- usually an Iterable does not allocate resources, only its Iterators
- if you consume the whole iterator, I hope the user would not have to do
any extra work
- you can also automatically
I'm not sure if this has been proposed in this thread, but if the common
case is that users consume the whole iterator, then you can close resources
at !hasNext(). And for cleanup of incompletely consumed iterators, rely on
what Kenn suggested. Since you're making your own runner, you can add
Alternatively to using weak/phantom reference:
* Can you configure RocksDb's memory usage/limits?
* Inside the iterator, periodically close and re-open the RocksDb
connection seeking back to where the user was?
* Use the ParDo/DoFn lifecycle and clean up after each
processElement/finishBundle
Thanks for drafting the details about the two approaches, Kenn. Now I
understand Luke's proposal better. The approach looks neat, but the
uncertainty of *when* GC is going to kick in will make users' life hard. If
the user happens to configure a large JVM heap size, and since rocksDb uses
off-heap
It is too soon to argue whether an API is complex or not. There has been no
specific API proposed.
I think the problem statement is real - you need to be able to read and
write bigger-than-memory state. It seems we have multiple runners that
don't support it, perhaps because of our API. You might
If I understand correctly, using weak references will help clean up the
Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java
iterator is just a JNI interface to the underlying C iterator, so we need
to explicitly invoke close to release the in-memory snapshot data, which
can
I don't agree. I believe you can track the iterators/iterables that are
created and freed by using weak references and reference queues (or other
methods). Having a few people work 10x as hard to provide a good
implementation is much better then having 100s or 1000s of users suffering
through a
Load/evict blocks will help reduce the cache memory footprint, but we still
won't be able to release the underlying resources. We can add definitely
heuristics to help release the resources as you mentioned, but there is no
accurate way to track all the iterators/iterables created and free them up
Users won't reliably close/release the resources and forcing them to will
make the user experience worse.
It will make a lot more sense to use a file format which allows random
access and use a cache to load/evict blocks of the state from memory.
If that is not possible, use an iterable which
Hi, folks,
I'm in the middle of implementing the MapState and SetState in our Samza
runner. We noticed that the state returns the Java Iterable for reading
entries, keys, etc. For state backed by file-based kv store like rocksDb,
we need to be able to let users explicitly close iterator/iterable
30 matches
Mail list logo