[GitHub] [arrow] pitrou commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-10 Thread GitBox


pitrou commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r591550643



##
File path: cpp/src/arrow/io/caching.h
##
@@ -99,6 +100,9 @@ class ARROW_EXPORT ReadRangeCache {
   /// \brief Read a range previously given to Cache().
   Result> Read(ReadRange range);
 
+  /// \brief Wait until all ranges added so far have been cached.
+  Future<> Wait();

Review comment:
   Fair enough :-)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] pitrou commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


pitrou commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587361262



##
File path: cpp/src/arrow/io/caching.cc
##
@@ -193,6 +195,36 @@ Result> 
ReadRangeCache::Read(ReadRange range) {
   return Status::Invalid("ReadRangeCache did not find matching cache entry");
 }
 
+Future<> ReadRangeCache::Wait() {
+  struct State {
+explicit State(std::vector>> f)
+: futures(std::move(f)), remaining(futures.size()) {}
+std::vector>> futures;
+std::atomic remaining;
+  };
+
+  std::vector>> futures;
+  for (const auto& entry : impl_->entries) {
+futures.push_back(entry.future);
+  }
+  auto out = Future<>::Make();
+  auto state = std::make_shared(std::move(futures));
+  for (const auto& future : state->futures) {
+future.AddCallback([state, out](const Result>&) 
mutable {
+  if (state->remaining.fetch_sub(1) != 1) return;

Review comment:
   Ideally this primitive would be globally available, instead of 
reimplemented here. So you could write for example:
   ```c++
 return Future::Gather(std::move(futures));
   ```
   
   Also there may be different kinds of "gather". By default you probably want 
to first error to immediately mark the resulting future errored.
   
   cc @westonpace 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] pitrou commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


pitrou commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587359365



##
File path: cpp/src/arrow/io/caching.h
##
@@ -99,6 +100,9 @@ class ARROW_EXPORT ReadRangeCache {
   /// \brief Read a range previously given to Cache().
   Result> Read(ReadRange range);
 
+  /// \brief Wait until all ranges added so far have been cached.
+  Future<> Wait();

Review comment:
   Hmm... Wouldn't it be better to expose a `ReadAsync` equivalent to 
`Read`?
   ```c++
   Future> Read(ReadRange range);
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org