Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-13 Thread Reuven Lax
At least one API that has been discussed in the past, is to use Java 8
CompletionStage. e.g.

 new DoFn() {
@ProcessElement
public void process(@Element CompletionStage element, ...) {
  element.thenApply(...)
}
  }

The framework will automatically create the CompletionStage, and the
process method can specify a pipeline of asynchronous operations to perform
on the element. When all of them are done, the element will be marked as
successfully processed.

Reuven

On Sun, May 13, 2018 at 11:36 AM Xinyu Liu  wrote:

> Thanks for all the pointers. I looked though the discussion over BEAM-2975
> and BEAM-2980 about having snapshot or live views of iterable, and the
> current semantics makes a lot of sense to me.  For your question: it does
> not require an explicit snapshot when we create RocksDb iterator directly.
> The iterator will read from an implicit snapshot as of the time the
> iterator is created [1], and the snapshot will be released after the
> iterator is closed. If we can have another method to return
> ReadableState, we might be able to apply the auto-closing
> approaches as we discussed and solve the problem here :).
>
> It's very interesting that you bring up the discussion about async API!
> Async IO has been widely adopted here among our users: they use netty for
> async calls with library named ParSeq [2] to help manage the calls. Samza
> provides a primitive callback style API [3], in which the user will invoke
> the callback after the remote calls are complete. Currently in a Samza job
> our users use this API with the ParSeq lib for remote IO. Seems we might
> have to do blocking calls (thus the poor resource utilization you
> mentioned) when using Beam API for now. It'll be great if you can send a
> few more details about the discussion about async API. I would like to add
> our use case and help move this forward.
>
> Thanks,
> Xinyu
>
> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
> [2]: https://github.com/linkedin/parseq
> [3]:
> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>
>
> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles  wrote:
>
>> I don't have any further suggestions, but want to call out how this hits
>> a lot of interesting points.
>>
>> The point about snapshotting is great. We have BEAM-2975 [1] and
>> BEAM-2980 [2] where we debated things a bit. I think the strongest case is
>> for what you describe - it should be a snapshot. Perhaps they should both
>> be closed as fixed...
>>
>> And you also bring up long blocking calls - we have also deliberately
>> decided that long synchronous blocking calls in @ProcessElement can be
>> embraced for simple programming and compensated with autoscaling smarts
>> (e.g. expand the thread pool by noticing poor utilization). The alternative
>> is a more future-istic API where the calls can be explicitly asynchronous.
>> We've had some interesting dev@ list discussions about that, too.
>>
>> Is another possibility to perhaps have read() return a
>> ReadableState instead? We could, of course, have two methods with
>> different names, one for iterator one for snapshot iterable. But wouldn't
>> the Iterator also require a snapshot? Doesn't a native RocksDb iterator
>> require a snapshot to have well-defined contents? As you can tell, I don't
>> know enough about RocksDb details to be sure of my suggestions.
>>
>> Kenn
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>
>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu  wrote:
>>
>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the
>>> question here, we discussed internally about releasing the underlying
>>> resources after consuming the whole iterator. This probably covers quite a
>>> lot of use cases. For some special cases that the user only consume part of
>>> the iterator, Luke and Kenn's suggestion about releasing after
>>> processElement() might work (I need to confirm about this with our use
>>> cases). So based on what we discussed so far, we might have a good way to
>>> automatically close an iterator for the store.
>>>
>>> There is another issue though: right now the state API returns an
>>> iterable for entries(), keys() and values(), and we can create iterator
>>> from it. From my understanding, the iterable holds a snapshot of the
>>> underlying store. In case of rocksDb, it's going to be a db.snapshot().
>>> Then when can we release the snapshot? It's not like iterator where we can
>>> use some heuristics to automatically release it. The user can hold on to
>>> the iterable and create iterators throughout the whole processElement().
>>> But if we only close the iterable after processElement(), I am quite
>>> concerned about the limitations this will bring. If the user is doing some
>>> remote call during the process, then the snapshot 

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-13 Thread Xinyu Liu
Thanks for all the pointers. I looked though the discussion over BEAM-2975
and BEAM-2980 about having snapshot or live views of iterable, and the
current semantics makes a lot of sense to me.  For your question: it does
not require an explicit snapshot when we create RocksDb iterator directly.
The iterator will read from an implicit snapshot as of the time the
iterator is created [1], and the snapshot will be released after the
iterator is closed. If we can have another method to return
ReadableState, we might be able to apply the auto-closing
approaches as we discussed and solve the problem here :).

It's very interesting that you bring up the discussion about async API!
Async IO has been widely adopted here among our users: they use netty for
async calls with library named ParSeq [2] to help manage the calls. Samza
provides a primitive callback style API [3], in which the user will invoke
the callback after the remote calls are complete. Currently in a Samza job
our users use this API with the ParSeq lib for remote IO. Seems we might
have to do blocking calls (thus the poor resource utilization you
mentioned) when using Beam API for now. It'll be great if you can send a
few more details about the discussion about async API. I would like to add
our use case and help move this forward.

Thanks,
Xinyu

[1]: https://github.com/facebook/rocksdb/wiki/Iterator
[2]: https://github.com/linkedin/parseq
[3]:
https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html


On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles  wrote:

> I don't have any further suggestions, but want to call out how this hits a
> lot of interesting points.
>
> The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980
> [2] where we debated things a bit. I think the strongest case is for what
> you describe - it should be a snapshot. Perhaps they should both be closed
> as fixed...
>
> And you also bring up long blocking calls - we have also deliberately
> decided that long synchronous blocking calls in @ProcessElement can be
> embraced for simple programming and compensated with autoscaling smarts
> (e.g. expand the thread pool by noticing poor utilization). The alternative
> is a more future-istic API where the calls can be explicitly asynchronous.
> We've had some interesting dev@ list discussions about that, too.
>
> Is another possibility to perhaps have read() return a
> ReadableState instead? We could, of course, have two methods with
> different names, one for iterator one for snapshot iterable. But wouldn't
> the Iterator also require a snapshot? Doesn't a native RocksDb iterator
> require a snapshot to have well-defined contents? As you can tell, I don't
> know enough about RocksDb details to be sure of my suggestions.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-2980
> [2] https://issues.apache.org/jira/browse/BEAM-2975
>
> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu  wrote:
>
>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
>> here, we discussed internally about releasing the underlying resources
>> after consuming the whole iterator. This probably covers quite a lot of use
>> cases. For some special cases that the user only consume part of the
>> iterator, Luke and Kenn's suggestion about releasing after processElement()
>> might work (I need to confirm about this with our use cases). So based on
>> what we discussed so far, we might have a good way to automatically close
>> an iterator for the store.
>>
>> There is another issue though: right now the state API returns an
>> iterable for entries(), keys() and values(), and we can create iterator
>> from it. From my understanding, the iterable holds a snapshot of the
>> underlying store. In case of rocksDb, it's going to be a db.snapshot().
>> Then when can we release the snapshot? It's not like iterator where we can
>> use some heuristics to automatically release it. The user can hold on to
>> the iterable and create iterators throughout the whole processElement().
>> But if we only close the iterable after processElement(), I am quite
>> concerned about the limitations this will bring. If the user is doing some
>> remote call during the process, then the snapshot might be held for a long
>> time before releasing, and might cause performance problems. And if the
>> user happen to create multiple iterables, then there will be multiple
>> snapshots loaded during process. Luke suggested being aggressive at closing
>> the resources and recreating when needed again. But in this case it might
>> not work since we won't be able to recreate the same snapshot given the
>> store might have been updated (and creating rocksDb snapshot is not cheap
>> too). I am running out of ideas other than exposing the iterator itself
>> somehow (and add close() if needed?). Any further suggestions?
>>
>> @Kenn: btw, I have the same impl you posted earlier 

Re: [PROPOSAL] Preparing 2.5.0 release next week

2018-05-13 Thread Jean-Baptiste Onofré

Hi guys,

just to let you know that the build fully passed on my box.

I'm testing the artifacts right now.

Regards
JB

On 06/04/2018 10:48, Jean-Baptiste Onofré wrote:

Hi guys,

Apache Beam 2.4.0 has been released on March 20th.

According to our cycle of release (roughly 6 weeks), we should think about 
2.5.0.

I'm volunteer to tackle this release.

I'm proposing the following items:

1. We start the Jira triage now, up to Tuesday
2. I would like to cut the release on Tuesday night (Europe time)
2bis. I think it's wiser to still use Maven for this release. Do you think we
will be ready to try a release with Gradle ?

After this release, I would like a discussion about:
1. Gradle release (if we release 2.5.0 with Maven)
2. Isolate release cycle per Beam part. I think it would be interesting to have
different release cycle: SDKs, DSLs, Runners, IOs. That's another discussion, I
will start a thread about that.

Thoughts ?

Regards
JB



Jenkins build is back to normal : beam_Release_Gradle_NightlySnapshot #37

2018-05-13 Thread Apache Jenkins Server
See