Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Xinyu Liu
I am happy to chat about it over hangout or slack too. Let's talk offline to set it up if needed. Thanks, Xinyu On Tue, May 15, 2018 at 10:51 AM, Xinyu Liu wrote: > For Samza runner, it's always processes key+window pairs serially. To > answer Luke's question: > > - Why

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Xinyu Liu
For Samza runner, it's always processes key+window pairs serially. To answer Luke's question: - Why Samza needs to use a snapshot in the first place and should be able to read data from RocksDb directly? I believe that the point of adding the readIterator() is to allow us to read from RocksDb

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Kenneth Knowles
On Tue, May 15, 2018 at 9:36 AM Lukasz Cwik wrote: > If it always processes key+window pairs serially, then I'm not sure why > Samza needs to use a snapshot in the first place and should be able to read > data from RocksDb directly. > > The other point of confusion is around

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Lukasz Cwik
In that case my confusion lies with how or why adding support for an iterator helps Samza resolve its memory issues and the consistency questions are about how a Runner is choosing to parallelize execution of a straight-line program. For example a runner may choose to use optimistic locking and

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Kenneth Knowles
OK, got it. But what consistency are you referring to? I was trying to point out that there's nothing but straight-line program order consistency. There's only one actor doing all the reads and all the writes. Kenn On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik wrote: > I

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Lukasz Cwik
I misspoke when I said portability semantics and should have said portability design/implementation. This is why I had a follow-up e-mail and clarified that I'm confused on: * I don't understand how you would want close to change the semantics of a user state specification and how it affects the

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Kenneth Knowles
I feel like this discussion is kind of far from the primary intention. The point of ParDo(stateful DoFn) is to enable naive single-threaded code in a style intuitive to a beginning imperative programmer. So: - the return value of read() should act like an immutable value - if there is a read

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Kenneth Knowles
I don't follow why allowing freeing resources would be counter to the spec. I don't really know what you mean by consistent for a bundle. State, in the sense of the user-facing per-key-and-window state API, is single threaded and scoped to a single DoFn. There's no one else who can write the

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Lukasz Cwik
Hmm, some of the problem I'm dealing with is: * I don't understand how you would want close to change the semantics of a user state specification and how it affects the lifetime of user state? ** Does it represent committing information within a bundle? ** Does it mean that user state can ignore

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Xinyu Liu
I will take a look at the docs to understand the problem better. A minor comment to 2) is that I don't intend to change the existing iterable API. I plan to implement it similar to Flink, loading the data into memory and closing the underlying snapshot after that. So the changes should be backward

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Lukasz Cwik
I believe adding support for a state spec to be 'closed' or 'freed' is counter to the requirement of a state spec being consistent for the lifetime of a bundle, are we willing to change this requirement for the lifetime of a bundle or say that runners can arbitrary say that a StateSpec can't be

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Lukasz Cwik
Before you go on and update the user facing API, we should discuss the last point I made since the change your making will have limited usability since the portability effort won't realistically allow you to see such low level things like when processElement finished and supporting user state will

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Xinyu Liu
We discussed internally about the proposed approaches. Seems if the State API can also expose another method to return a ReadableState, it will cover our cases of iterating over a bigger-then-memory state, and closing the underlying rocksDb snapshot immediately after the iterator is fully consumed

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Kenneth Knowles
On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik wrote: > Users typically want to do that async operation and then produce output > with it. Adding asynchronous execution is difficult within the framework > because a lot of code is currently not needed to be thread safe and writing

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Lukasz Cwik
4 separate points about discussions above: Note that the state API requires that the underlying state is snapshotted during bundle processing, in BEAM-2975 the discussion is whether the current in memory representation/modifications is directly visible or not during processElement. It should be

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-13 Thread Reuven Lax
At least one API that has been discussed in the past, is to use Java 8 CompletionStage. e.g. new DoFn() { @ProcessElement public void process(@Element CompletionStage element, ...) { element.thenApply(...) } } The framework will automatically create the

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-13 Thread Xinyu Liu
Thanks for all the pointers. I looked though the discussion over BEAM-2975 and BEAM-2980 about having snapshot or live views of iterable, and the current semantics makes a lot of sense to me. For your question: it does not require an explicit snapshot when we create RocksDb iterator directly. The

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-12 Thread Kenneth Knowles
I don't have any further suggestions, but want to call out how this hits a lot of interesting points. The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980 [2] where we debated things a bit. I think the strongest case is for what you describe - it should be a snapshot.

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-12 Thread Xinyu Liu
Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question here, we discussed internally about releasing the underlying resources after consuming the whole iterator. This probably covers quite a lot of use cases. For some special cases that the user only consume part of the

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Lukasz Cwik
The iterator going out of scope is the idiomatic way that resources are freed for Java developers (hence the weak/phantom reference suggestion). Explicitly requiring users to deal with 'handles' (like file streams) lead to leaked resources. On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Kenneth Knowles
Thanks Xinyu, I actually had first sketched out just what you wrote. But then I realized a few things: - usually an Iterable does not allocate resources, only its Iterators - if you consume the whole iterator, I hope the user would not have to do any extra work - you can also automatically

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Eugene Kirpichov
I'm not sure if this has been proposed in this thread, but if the common case is that users consume the whole iterator, then you can close resources at !hasNext(). And for cleanup of incompletely consumed iterators, rely on what Kenn suggested. Since you're making your own runner, you can add

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Lukasz Cwik
Alternatively to using weak/phantom reference: * Can you configure RocksDb's memory usage/limits? * Inside the iterator, periodically close and re-open the RocksDb connection seeking back to where the user was? * Use the ParDo/DoFn lifecycle and clean up after each processElement/finishBundle

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Xinyu Liu
Thanks for drafting the details about the two approaches, Kenn. Now I understand Luke's proposal better. The approach looks neat, but the uncertainty of *when* GC is going to kick in will make users' life hard. If the user happens to configure a large JVM heap size, and since rocksDb uses off-heap

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Kenneth Knowles
It is too soon to argue whether an API is complex or not. There has been no specific API proposed. I think the problem statement is real - you need to be able to read and write bigger-than-memory state. It seems we have multiple runners that don't support it, perhaps because of our API. You might

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
If I understand correctly, using weak references will help clean up the Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java iterator is just a JNI interface to the underlying C iterator, so we need to explicitly invoke close to release the in-memory snapshot data, which can

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Lukasz Cwik
I don't agree. I believe you can track the iterators/iterables that are created and freed by using weak references and reference queues (or other methods). Having a few people work 10x as hard to provide a good implementation is much better then having 100s or 1000s of users suffering through a

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
Load/evict blocks will help reduce the cache memory footprint, but we still won't be able to release the underlying resources. We can add definitely heuristics to help release the resources as you mentioned, but there is no accurate way to track all the iterators/iterables created and free them up

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Lukasz Cwik
Users won't reliably close/release the resources and forcing them to will make the user experience worse. It will make a lot more sense to use a file format which allows random access and use a cache to load/evict blocks of the state from memory. If that is not possible, use an iterable which

Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
Hi, folks, I'm in the middle of implementing the MapState and SetState in our Samza runner. We noticed that the state returns the Java Iterable for reading entries, keys, etc. For state backed by file-based kv store like rocksDb, we need to be able to let users explicitly close iterator/iterable