[GitHub] [arrow] lidavidm commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


lidavidm commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587733815



##
File path: cpp/src/arrow/io/caching.cc
##
@@ -193,6 +195,36 @@ Result> 
ReadRangeCache::Read(ReadRange range) {
   return Status::Invalid("ReadRangeCache did not find matching cache entry");
 }
 
+Future<> ReadRangeCache::Wait() {
+  struct State {
+explicit State(std::vector>> f)
+: futures(std::move(f)), remaining(futures.size()) {}
+std::vector>> futures;
+std::atomic remaining;
+  };
+
+  std::vector>> futures;
+  for (const auto& entry : impl_->entries) {
+futures.push_back(entry.future);
+  }
+  auto out = Future<>::Make();
+  auto state = std::make_shared(std::move(futures));
+  for (const auto& future : state->futures) {
+future.AddCallback([state, out](const Result>&) 
mutable {
+  if (state->remaining.fetch_sub(1) != 1) return;

Review comment:
   Future is already convertible to Future<>. So what you suggest would 
work, but we'd need the fail-fast behavior, yes. 
   
   A flag like that, though, does change the semantics rather dramatically, I'd 
rather keep them as separate functions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] lidavidm commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


lidavidm commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587706050



##
File path: cpp/src/arrow/io/caching.cc
##
@@ -193,6 +195,36 @@ Result> 
ReadRangeCache::Read(ReadRange range) {
   return Status::Invalid("ReadRangeCache did not find matching cache entry");
 }
 
+Future<> ReadRangeCache::Wait() {
+  struct State {
+explicit State(std::vector>> f)
+: futures(std::move(f)), remaining(futures.size()) {}
+std::vector>> futures;
+std::atomic remaining;
+  };
+
+  std::vector>> futures;
+  for (const auto& entry : impl_->entries) {
+futures.push_back(entry.future);
+  }
+  auto out = Future<>::Make();
+  auto state = std::make_shared(std::move(futures));
+  for (const auto& future : state->futures) {
+future.AddCallback([state, out](const Result>&) 
mutable {
+  if (state->remaining.fetch_sub(1) != 1) return;

Review comment:
   All gives `Future>>` and never marks the future failed; 
this gives `Future<>` and marks the future failed if any individual result 
failed. Additionally this 'fails fast' in that if any fail, you immediately get 
a result, instead of waiting for the rest of the futures.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] lidavidm commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


lidavidm commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587452478



##
File path: cpp/src/arrow/io/caching.h
##
@@ -99,6 +100,9 @@ class ARROW_EXPORT ReadRangeCache {
   /// \brief Read a range previously given to Cache().
   Result> Read(ReadRange range);
 
+  /// \brief Wait until all ranges added so far have been cached.
+  Future<> Wait();

Review comment:
   The idea here is to cleanly separate out the I/O operations so you can 
schedule them on different executors. With a ReadAsync, you wouldn't be able to 
accomplish that unless you went and refactored the entire Parquet reader so 
that it was fully async-aware.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] lidavidm commented on a change in pull request #9613: PARQUET-1993: [C++] expose way to wait for I/O to complete

2021-03-04 Thread GitBox


lidavidm commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587451127



##
File path: cpp/src/arrow/io/caching.cc
##
@@ -193,6 +195,36 @@ Result> 
ReadRangeCache::Read(ReadRange range) {
   return Status::Invalid("ReadRangeCache did not find matching cache entry");
 }
 
+Future<> ReadRangeCache::Wait() {
+  struct State {
+explicit State(std::vector>> f)
+: futures(std::move(f)), remaining(futures.size()) {}
+std::vector>> futures;
+std::atomic remaining;
+  };
+
+  std::vector>> futures;
+  for (const auto& entry : impl_->entries) {
+futures.push_back(entry.future);
+  }
+  auto out = Future<>::Make();
+  auto state = std::make_shared(std::move(futures));
+  for (const auto& future : state->futures) {
+future.AddCallback([state, out](const Result>&) 
mutable {
+  if (state->remaining.fetch_sub(1) != 1) return;

Review comment:
   Agreed. I can move this to future.h/future.cc since there's already a 
very similar helper there. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org