This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 27d21d4  ARROW-11907: [C++] Use our own executor in S3FileSystem
27d21d4 is described below

commit 27d21d4d3b18280805ed7118c089d930fd018f11
Author: Antoine Pitrou <anto...@python.org>
AuthorDate: Wed Mar 17 11:26:24 2021 +0100

    ARROW-11907: [C++] Use our own executor in S3FileSystem
    
    The async APIs in the AWS SDK merely spawn a new thread each time
    they are called.
    
    By using our own executor, we schedule requests on our IO thread pool,
    and we allow for potential cancellation.
    
    Closes #9678 from pitrou/ARROW-11907-s3fs-executor
    
    Authored-by: Antoine Pitrou <anto...@python.org>
    Signed-off-by: Antoine Pitrou <anto...@python.org>
---
 cpp/src/arrow/filesystem/s3fs.cc      | 211 +++++++++++++++++++---------------
 cpp/src/arrow/filesystem/s3fs_test.cc |   5 +-
 cpp/src/arrow/util/future.cc          |   4 +
 cpp/src/arrow/util/future.h           |   4 +
 cpp/src/arrow/util/future_test.cc     |  16 ++-
 5 files changed, 145 insertions(+), 95 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 6b2b708..1940f4d 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -79,6 +79,7 @@
 #include "arrow/util/future.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
 #include "arrow/util/windows_fixup.h"
 
 namespace arrow {
@@ -488,7 +489,7 @@ class ClientBuilder {
 
   Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; 
}
 
-  Result<std::unique_ptr<S3Client>> BuildClient() {
+  Result<std::shared_ptr<S3Client>> BuildClient() {
     credentials_provider_ = options_.credentials_provider;
     if (!options_.region.empty()) {
       client_config_.region = ToAwsString(options_.region);
@@ -510,10 +511,10 @@ class ClientBuilder {
     }
 
     const bool use_virtual_addressing = options_.endpoint_override.empty();
-    return std::unique_ptr<S3Client>(
-        new S3Client(credentials_provider_, client_config_,
-                     Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
-                     use_virtual_addressing));
+    return std::make_shared<S3Client>(
+        credentials_provider_, client_config_,
+        Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+        use_virtual_addressing);
   }
 
   const S3Options& options() const { return options_; }
@@ -585,7 +586,7 @@ class RegionResolver {
   }
 
   ClientBuilder builder_;
-  std::unique_ptr<S3Client> client_;
+  std::shared_ptr<S3Client> client_;
 
   std::mutex cache_mutex_;
   // XXX Should cache size be bounded?  It must be quite unusual to query 
millions
@@ -630,11 +631,10 @@ Result<S3Model::GetObjectResult> 
GetObjectRange(Aws::S3::S3Client* client,
 // A RandomAccessFile that reads from a S3 object
 class ObjectInputFile final : public io::RandomAccessFile {
  public:
-  ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
+  ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
                   const io::IOContext& io_context, const S3Path& path,
                   int64_t size = kNoSize)
-      : fs_(std::move(fs)),
-        client_(client),
+      : client_(std::move(client)),
         io_context_(io_context),
         path_(path),
         content_length_(size) {}
@@ -687,7 +687,6 @@ class ObjectInputFile final : public io::RandomAccessFile {
   // RandomAccessFile APIs
 
   Status Close() override {
-    fs_.reset();
     client_ = nullptr;
     closed_ = true;
     return Status::OK();
@@ -724,7 +723,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
 
     // Read the desired range of bytes
     ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
-                          GetObjectRange(client_, path_, position, nbytes, 
out));
+                          GetObjectRange(client_.get(), path_, position, 
nbytes, out));
 
     auto& stream = result.GetBody();
     stream.ignore(nbytes);
@@ -763,8 +762,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
   }
 
  protected:
-  std::shared_ptr<FileSystem> fs_;  // Owner of S3Client
-  Aws::S3::S3Client* client_;
+  std::shared_ptr<Aws::S3::S3Client> client_;
   const io::IOContext io_context_;
   S3Path path_;
 
@@ -785,11 +783,10 @@ class ObjectOutputStream final : public io::OutputStream {
   struct UploadState;
 
  public:
-  ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
+  ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
                      const io::IOContext& io_context, const S3Path& path,
                      const S3Options& options)
-      : fs_(std::move(fs)),
-        client_(client),
+      : client_(std::move(client)),
         io_context_(io_context),
         path_(path),
         options_(options) {}
@@ -837,7 +834,6 @@ class ObjectOutputStream final : public io::OutputStream {
           outcome.GetError());
     }
     current_part_.reset();
-    fs_.reset();
     client_ = nullptr;
     closed_ = true;
     return Status::OK();
@@ -884,7 +880,6 @@ class ObjectOutputStream final : public io::OutputStream {
           outcome.GetError());
     }
 
-    fs_.reset();
     client_ = nullptr;
     closed_ = true;
     return Status::OK();
@@ -981,10 +976,6 @@ class ObjectOutputStream final : public io::OutputStream {
         AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
       }
     } else {
-      std::unique_lock<std::mutex> lock(upload_state_->mutex);
-      auto state = upload_state_;  // Keep upload state alive in closure
-      auto part_number = part_number_;
-
       // If the data isn't owned, make an immutable copy for the lifetime of 
the closure
       if (owned_buffer == nullptr) {
         ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, 
io_context_.pool()));
@@ -996,24 +987,23 @@ class ObjectOutputStream final : public io::OutputStream {
       req.SetBody(
           std::make_shared<StringViewStream>(owned_buffer->data(), 
owned_buffer->size()));
 
-      auto handler =
-          [state, owned_buffer, part_number](
-              const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
-              const S3Model::UploadPartOutcome& outcome,
-              const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) 
-> void {
-        std::unique_lock<std::mutex> lock(state->mutex);
-        if (!outcome.IsSuccess()) {
-          state->status &= UploadPartError(req, outcome);
-        } else {
-          AddCompletedPart(state, part_number, outcome.GetResult());
-        }
-        // Notify completion, regardless of success / error status
-        if (--state->parts_in_progress == 0) {
-          state->cv.notify_all();
-        }
+      {
+        std::unique_lock<std::mutex> lock(upload_state_->mutex);
+        ++upload_state_->parts_in_progress;
+      }
+      auto client = client_;
+      ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
+                                          io_context_.stop_token(), [client, 
req]() {
+                                            return client->UploadPart(req);
+                                          }));
+      // The closure keeps the buffer and the upload state alive
+      auto state = upload_state_;
+      auto part_number = part_number_;
+      auto handler = [owned_buffer, state, part_number,
+                      req](const Result<S3Model::UploadPartOutcome>& result) 
-> void {
+        HandleUploadOutcome(state, part_number, req, result);
       };
-      ++upload_state_->parts_in_progress;
-      client_->UploadPartAsync(req, handler);
+      fut.AddCallback(std::move(handler));
     }
 
     ++part_number_;
@@ -1035,6 +1025,26 @@ class ObjectOutputStream final : public io::OutputStream 
{
     return Status::OK();
   }
 
+  static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
+                                  int part_number, const 
S3Model::UploadPartRequest& req,
+                                  const Result<S3Model::UploadPartOutcome>& 
result) {
+    std::unique_lock<std::mutex> lock(state->mutex);
+    if (!result.ok()) {
+      state->status &= result.status();
+    } else {
+      const auto& outcome = *result;
+      if (!outcome.IsSuccess()) {
+        state->status &= UploadPartError(req, outcome);
+      } else {
+        AddCompletedPart(state, part_number, outcome.GetResult());
+      }
+    }
+    // Notify completion
+    if (--state->parts_in_progress == 0) {
+      state->cv.notify_all();
+    }
+  }
+
   static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int 
part_number,
                                const S3Model::UploadPartResult& result) {
     S3Model::CompletedPart part;
@@ -1059,8 +1069,7 @@ class ObjectOutputStream final : public io::OutputStream {
   }
 
  protected:
-  std::shared_ptr<FileSystem> fs_;  // Owner of S3Client
-  Aws::S3::S3Client* client_;
+  std::shared_ptr<Aws::S3::S3Client> client_;
   const io::IOContext io_context_;
   S3Path path_;
   const S3Options& options_;
@@ -1081,8 +1090,6 @@ class ObjectOutputStream final : public io::OutputStream {
     Aws::Vector<S3Model::CompletedPart> completed_parts;
     int64_t parts_in_progress = 0;
     Status status;
-
-    UploadState() : status(Status::OK()) {}
   };
   std::shared_ptr<UploadState> upload_state_;
 };
@@ -1106,7 +1113,8 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
   using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
   using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
 
-  Aws::S3::S3Client* client_;
+  std::shared_ptr<Aws::S3::S3Client> client_;
+  io::IOContext io_context_;
   const std::string bucket_;
   const std::string base_dir_;
   const int32_t max_keys_;
@@ -1120,10 +1128,12 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
     return self->DoWalk();
   }
 
-  TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string 
base_dir,
-             int32_t max_keys, ResultHandler result_handler, ErrorHandler 
error_handler,
+  TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext 
io_context,
+             std::string bucket, std::string base_dir, int32_t max_keys,
+             ResultHandler result_handler, ErrorHandler error_handler,
              RecursionHandler recursion_handler)
       : client_(std::move(client)),
+        io_context_(io_context),
         bucket_(std::move(bucket)),
         base_dir_(std::move(base_dir)),
         max_keys_(max_keys),
@@ -1159,23 +1169,41 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
     int32_t nesting_depth;
     S3Model::ListObjectsV2Request req;
 
-    void operator()(const Aws::S3::S3Client*, const 
S3Model::ListObjectsV2Request&,
-                    const S3Model::ListObjectsV2Outcome& outcome,
-                    const std::shared_ptr<const 
Aws::Client::AsyncCallerContext>&) {
+    void operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
       // Serialize calls to operation-specific handlers
       std::unique_lock<std::mutex> guard(walker->mutex_);
       if (walker->is_finished()) {
         // Early exit: avoid executing handlers if DoWalk() returned
         return;
       }
+      if (!result.ok()) {
+        HandleError(result.status());
+        return;
+      }
+      const auto& outcome = *result;
       if (!outcome.IsSuccess()) {
         Status st = walker->error_handler_(outcome.GetError());
-        walker->ListObjectsFinished(std::move(st));
+        HandleError(std::move(st));
         return;
       }
       HandleResult(outcome.GetResult());
     }
 
+    void SpawnListObjectsV2() {
+      auto walker = this->walker;
+      auto req = this->req;
+      auto maybe_fut = walker->io_context_.executor()->Submit(
+          walker->io_context_.stop_token(),
+          [walker, req]() { return walker->client_->ListObjectsV2(req); });
+      if (!maybe_fut.ok()) {
+        HandleError(maybe_fut.status());
+        return;
+      }
+      maybe_fut->AddCallback(*this);
+    }
+
+    void HandleError(Status status) { 
walker->ListObjectsFinished(std::move(status)); }
+
     void HandleResult(const S3Model::ListObjectsV2Result& result) {
       bool recurse = result.GetCommonPrefixes().size() > 0;
       if (recurse) {
@@ -1199,7 +1227,7 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
       if (result.GetIsTruncated()) {
         DCHECK(!result.GetNextContinuationToken().empty());
         req.SetContinuationToken(result.GetNextContinuationToken());
-        walker->client_->ListObjectsV2Async(req, *this);
+        SpawnListObjectsV2();
       } else {
         walker->ListObjectsFinished(Status::OK());
       }
@@ -1212,7 +1240,7 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
       }
       req.SetDelimiter(Aws::String() + kSep);
       req.SetMaxKeys(walker->max_keys_);
-      walker->client_->ListObjectsV2Async(req, *this);
+      SpawnListObjectsV2();
     }
   };
 
@@ -1241,7 +1269,8 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
 class S3FileSystem::Impl {
  public:
   ClientBuilder builder_;
-  std::unique_ptr<Aws::S3::S3Client> client_;
+  io::IOContext io_context_;
+  std::shared_ptr<Aws::S3::S3Client> client_;
   util::optional<S3Backend> backend_;
 
   const int32_t kListObjectsMaxKeys = 1000;
@@ -1250,7 +1279,8 @@ class S3FileSystem::Impl {
   // Limit recursing depth, since a recursion bomb can be created
   const int32_t kMaxNestingDepth = 100;
 
-  explicit Impl(S3Options options) : builder_(std::move(options)) {}
+  explicit Impl(S3Options options, io::IOContext io_context)
+      : builder_(std::move(options)), io_context_(io_context) {}
 
   Status Init() { return builder_.BuildClient().Value(&client_); }
 
@@ -1442,7 +1472,7 @@ class S3FileSystem::Impl {
       return Status::OK();
     };
 
-    RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, 
kListObjectsMaxKeys,
+    RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, 
kListObjectsMaxKeys,
                                    handle_results, handle_error, 
handle_recursion));
 
     // If no contents were found, perhaps it's an empty "directory",
@@ -1487,22 +1517,23 @@ class S3FileSystem::Impl {
       return true;  // Recurse
     };
 
-    return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+    return TreeWalker::Walk(client_, io_context_, bucket, key, 
kListObjectsMaxKeys,
                             handle_results, handle_error, handle_recursion);
   }
 
   // Delete multiple objects at once
-  Status DeleteObjects(const std::string& bucket, const 
std::vector<std::string>& keys) {
-    struct DeleteHandler {
-      Future<> future = Future<>::Make();
-
-      // Callback for DeleteObjectsAsync
-      void operator()(const Aws::S3::S3Client*, const 
S3Model::DeleteObjectsRequest& req,
-                      const S3Model::DeleteObjectsOutcome& outcome,
-                      const std::shared_ptr<const 
Aws::Client::AsyncCallerContext>&) {
+  Future<> DeleteObjectsAsync(const std::string& bucket,
+                              const std::vector<std::string>& keys) {
+    struct DeleteCallback {
+      const std::string bucket;
+
+      Status operator()(const Result<S3Model::DeleteObjectsOutcome>& result) {
+        if (!result.ok()) {
+          return result.status();
+        }
+        const auto& outcome = *result;
         if (!outcome.IsSuccess()) {
-          future.MarkFinished(ErrorToStatus(outcome.GetError()));
-          return;
+          return ErrorToStatus(outcome.GetError());
         }
         // Also need to check per-key errors, even on successful outcome
         // See
@@ -1511,23 +1542,22 @@ class S3FileSystem::Impl {
         if (!errors.empty()) {
           std::stringstream ss;
           ss << "Got the following " << errors.size()
-             << " errors when deleting objects in S3 bucket '" << 
req.GetBucket()
-             << "':\n";
+             << " errors when deleting objects in S3 bucket '" << bucket << 
"':\n";
           for (const auto& error : errors) {
             ss << "- key '" << error.GetKey() << "': " << error.GetMessage() 
<< "\n";
           }
-          future.MarkFinished(Status::IOError(ss.str()));
-        } else {
-          future.MarkFinished();
+          return Status::IOError(ss.str());
         }
+        return Status::OK();
       }
     };
 
     const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
-    std::vector<DeleteHandler> delete_handlers;
-    std::vector<Future<>*> futures;
-    delete_handlers.reserve(keys.size() / chunk_size + 1);
-    futures.reserve(delete_handlers.capacity());
+    DeleteCallback delete_cb{bucket};
+    auto client = client_;
+
+    std::vector<Future<>> futures;
+    futures.reserve(keys.size() / chunk_size + 1);
 
     for (size_t start = 0; start < keys.size(); start += chunk_size) {
       S3Model::DeleteObjectsRequest req;
@@ -1537,16 +1567,18 @@ class S3FileSystem::Impl {
       }
       req.SetBucket(ToAwsString(bucket));
       req.SetDelete(std::move(del));
-      delete_handlers.emplace_back();
-      futures.push_back(&delete_handlers.back().future);
-      client_->DeleteObjectsAsync(req, delete_handlers.back());
+      ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
+                                          io_context_.stop_token(), [client, 
req]() {
+                                            return client->DeleteObjects(req);
+                                          }));
+      futures.push_back(std::move(fut).Then(delete_cb));
     }
 
-    WaitForAll(futures);
-    for (const auto* fut : futures) {
-      RETURN_NOT_OK(fut->status());
-    }
-    return Status::OK();
+    return AllComplete(futures);
+  }
+
+  Status DeleteObjects(const std::string& bucket, const 
std::vector<std::string>& keys) {
+    return DeleteObjectsAsync(bucket, keys).status();
   }
 
   Status DeleteDirContents(const std::string& bucket, const std::string& key) {
@@ -1601,8 +1633,7 @@ class S3FileSystem::Impl {
     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
     RETURN_NOT_OK(ValidateFilePath(path));
 
-    auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), 
client_.get(),
-                                                 fs->io_context(), path);
+    auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), 
path);
     RETURN_NOT_OK(ptr->Init());
     return ptr;
   }
@@ -1619,15 +1650,15 @@ class S3FileSystem::Impl {
     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
     RETURN_NOT_OK(ValidateFilePath(path));
 
-    auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), 
client_.get(),
-                                                 fs->io_context(), path, 
info.size());
+    auto ptr =
+        std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, 
info.size());
     RETURN_NOT_OK(ptr->Init());
     return ptr;
   }
 };
 
 S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& 
io_context)
-    : FileSystem(io_context), impl_(new Impl{options}) {
+    : FileSystem(io_context), impl_(new Impl{options, io_context}) {
   default_async_is_sync_ = false;
 }
 
@@ -1908,8 +1939,8 @@ Result<std::shared_ptr<io::OutputStream>> 
S3FileSystem::OpenOutputStream(
   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
   RETURN_NOT_OK(ValidateFilePath(path));
 
-  auto ptr = std::make_shared<ObjectOutputStream>(
-      shared_from_this(), impl_->client_.get(), io_context(), path, 
impl_->options());
+  auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, 
io_context(), path,
+                                                  impl_->options());
   RETURN_NOT_OK(ptr->Init());
   return ptr;
 }
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc 
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 99e6b3f..c79d9f7 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -464,10 +464,10 @@ class TestS3FS : public S3TestMixin {
     std::weak_ptr<S3FileSystem> weak_fs(fs_);
     ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5"));
     fs_.reset();
-    ASSERT_FALSE(weak_fs.expired());
-    ASSERT_OK(stream->Write("some data"));
+    ASSERT_OK(stream->Write("some other data"));
     ASSERT_OK(stream->Close());
     ASSERT_TRUE(weak_fs.expired());
+    AssertObjectContents(client_.get(), "bucket", "newfile5", "some other 
data");
   }
 
   void TestOpenOutputStreamAbort() {
@@ -802,7 +802,6 @@ TEST_F(TestS3FS, OpenInputStream) {
   std::weak_ptr<S3FileSystem> weak_fs(fs_);
   ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile"));
   fs_.reset();
-  ASSERT_FALSE(weak_fs.expired());
   ASSERT_OK_AND_ASSIGN(buf, stream->Read(10));
   AssertBufferEqual(*buf, "some data");
   ASSERT_OK(stream->Close());
diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc
index 4b867ea..501d5ca 100644
--- a/cpp/src/arrow/util/future.cc
+++ b/cpp/src/arrow/util/future.cc
@@ -348,6 +348,10 @@ Future<> AllComplete(const std::vector<Future<>>& futures) 
{
     std::atomic<size_t> n_remaining;
   };
 
+  if (futures.empty()) {
+    return Future<>::MakeFinished();
+  }
+
   auto state = std::make_shared<State>(futures.size());
   auto out = Future<>::Make();
   for (const auto& future : futures) {
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index fc63a42..4ede291 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -628,6 +628,10 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>> 
futures) {
     std::atomic<size_t> n_remaining;
   };
 
+  if (futures.size() == 0) {
+    return {std::vector<Result<T>>{}};
+  }
+
   auto state = std::make_shared<State>(std::move(futures));
 
   auto out = Future<std::vector<Result<T>>>::Make();
diff --git a/cpp/src/arrow/util/future_test.cc 
b/cpp/src/arrow/util/future_test.cc
index 0bd8380..0fc202e 100644
--- a/cpp/src/arrow/util/future_test.cc
+++ b/cpp/src/arrow/util/future_test.cc
@@ -971,6 +971,13 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Empty) {
+  auto combined = arrow::All(std::vector<Future<int>>{});
+  auto after_assert = combined.Then(
+      [](std::vector<Result<int>> results) { ASSERT_EQ(0, results.size()); });
+  AssertSuccessful(after_assert);
+}
+
 TEST(FutureAllTest, Simple) {
   auto f1 = Future<int>::Make();
   auto f2 = Future<int>::Make();
@@ -1012,11 +1019,16 @@ TEST(FutureAllTest, Failure) {
   AssertFinished(after_assert);
 }
 
+TEST(FutureAllCompleteTest, Empty) {
+  Future<> combined = AllComplete(std::vector<Future<>>{});
+  AssertSuccessful(combined);
+}
+
 TEST(FutureAllCompleteTest, Simple) {
   auto f1 = Future<int>::Make();
   auto f2 = Future<int>::Make();
   std::vector<Future<>> futures = {Future<>(f1), Future<>(f2)};
-  auto combined = arrow::AllComplete(futures);
+  auto combined = AllComplete(futures);
   AssertNotFinished(combined);
   f2.MarkFinished(2);
   AssertNotFinished(combined);
@@ -1029,7 +1041,7 @@ TEST(FutureAllCompleteTest, Failure) {
   auto f2 = Future<int>::Make();
   auto f3 = Future<int>::Make();
   std::vector<Future<>> futures = {Future<>(f1), Future<>(f2), Future<>(f3)};
-  auto combined = arrow::AllComplete(futures);
+  auto combined = AllComplete(futures);
   AssertNotFinished(combined);
   f1.MarkFinished(1);
   AssertNotFinished(combined);

Reply via email to