This is an automated email from the ASF dual-hosted git repository. apitrou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 27d21d4 ARROW-11907: [C++] Use our own executor in S3FileSystem 27d21d4 is described below commit 27d21d4d3b18280805ed7118c089d930fd018f11 Author: Antoine Pitrou <anto...@python.org> AuthorDate: Wed Mar 17 11:26:24 2021 +0100 ARROW-11907: [C++] Use our own executor in S3FileSystem The async APIs in the AWS SDK merely spawn a new thread each time they are called. By using our own executor, we schedule requests on our IO thread pool, and we allow for potential cancellation. Closes #9678 from pitrou/ARROW-11907-s3fs-executor Authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: Antoine Pitrou <anto...@python.org> --- cpp/src/arrow/filesystem/s3fs.cc | 211 +++++++++++++++++++--------------- cpp/src/arrow/filesystem/s3fs_test.cc | 5 +- cpp/src/arrow/util/future.cc | 4 + cpp/src/arrow/util/future.h | 4 + cpp/src/arrow/util/future_test.cc | 16 ++- 5 files changed, 145 insertions(+), 95 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 6b2b708..1940f4d 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -79,6 +79,7 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/windows_fixup.h" namespace arrow { @@ -488,7 +489,7 @@ class ClientBuilder { Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; } - Result<std::unique_ptr<S3Client>> BuildClient() { + Result<std::shared_ptr<S3Client>> BuildClient() { credentials_provider_ = options_.credentials_provider; if (!options_.region.empty()) { client_config_.region = ToAwsString(options_.region); @@ -510,10 +511,10 @@ class ClientBuilder { } const bool use_virtual_addressing = options_.endpoint_override.empty(); - return std::unique_ptr<S3Client>( - new S3Client(credentials_provider_, client_config_, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - use_virtual_addressing)); + return std::make_shared<S3Client>( + credentials_provider_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + use_virtual_addressing); } const S3Options& options() const { return options_; } @@ -585,7 +586,7 @@ class RegionResolver { } ClientBuilder builder_; - std::unique_ptr<S3Client> client_; + std::shared_ptr<S3Client> client_; std::mutex cache_mutex_; // XXX Should cache size be bounded? It must be quite unusual to query millions @@ -630,11 +631,10 @@ Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client, // A RandomAccessFile that reads from a S3 object class ObjectInputFile final : public io::RandomAccessFile { public: - ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client, + ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client, const io::IOContext& io_context, const S3Path& path, int64_t size = kNoSize) - : fs_(std::move(fs)), - client_(client), + : client_(std::move(client)), io_context_(io_context), path_(path), content_length_(size) {} @@ -687,7 +687,6 @@ class ObjectInputFile final : public io::RandomAccessFile { // RandomAccessFile APIs Status Close() override { - fs_.reset(); client_ = nullptr; closed_ = true; return Status::OK(); @@ -724,7 +723,7 @@ class ObjectInputFile final : public io::RandomAccessFile { // Read the desired range of bytes ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result, - GetObjectRange(client_, path_, position, nbytes, out)); + GetObjectRange(client_.get(), path_, position, nbytes, out)); auto& stream = result.GetBody(); stream.ignore(nbytes); @@ -763,8 +762,7 @@ class ObjectInputFile final : public io::RandomAccessFile { } protected: - std::shared_ptr<FileSystem> fs_; // Owner of S3Client - Aws::S3::S3Client* client_; + std::shared_ptr<Aws::S3::S3Client> client_; const io::IOContext io_context_; S3Path path_; @@ -785,11 +783,10 @@ class ObjectOutputStream final : public io::OutputStream { struct UploadState; public: - ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client, + ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client, const io::IOContext& io_context, const S3Path& path, const S3Options& options) - : fs_(std::move(fs)), - client_(client), + : client_(std::move(client)), io_context_(io_context), path_(path), options_(options) {} @@ -837,7 +834,6 @@ class ObjectOutputStream final : public io::OutputStream { outcome.GetError()); } current_part_.reset(); - fs_.reset(); client_ = nullptr; closed_ = true; return Status::OK(); @@ -884,7 +880,6 @@ class ObjectOutputStream final : public io::OutputStream { outcome.GetError()); } - fs_.reset(); client_ = nullptr; closed_ = true; return Status::OK(); @@ -981,10 +976,6 @@ class ObjectOutputStream final : public io::OutputStream { AddCompletedPart(upload_state_, part_number_, outcome.GetResult()); } } else { - std::unique_lock<std::mutex> lock(upload_state_->mutex); - auto state = upload_state_; // Keep upload state alive in closure - auto part_number = part_number_; - // If the data isn't owned, make an immutable copy for the lifetime of the closure if (owned_buffer == nullptr) { ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); @@ -996,24 +987,23 @@ class ObjectOutputStream final : public io::OutputStream { req.SetBody( std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size())); - auto handler = - [state, owned_buffer, part_number]( - const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req, - const S3Model::UploadPartOutcome& outcome, - const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) -> void { - std::unique_lock<std::mutex> lock(state->mutex); - if (!outcome.IsSuccess()) { - state->status &= UploadPartError(req, outcome); - } else { - AddCompletedPart(state, part_number, outcome.GetResult()); - } - // Notify completion, regardless of success / error status - if (--state->parts_in_progress == 0) { - state->cv.notify_all(); - } + { + std::unique_lock<std::mutex> lock(upload_state_->mutex); + ++upload_state_->parts_in_progress; + } + auto client = client_; + ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit( + io_context_.stop_token(), [client, req]() { + return client->UploadPart(req); + })); + // The closure keeps the buffer and the upload state alive + auto state = upload_state_; + auto part_number = part_number_; + auto handler = [owned_buffer, state, part_number, + req](const Result<S3Model::UploadPartOutcome>& result) -> void { + HandleUploadOutcome(state, part_number, req, result); }; - ++upload_state_->parts_in_progress; - client_->UploadPartAsync(req, handler); + fut.AddCallback(std::move(handler)); } ++part_number_; @@ -1035,6 +1025,26 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state, + int part_number, const S3Model::UploadPartRequest& req, + const Result<S3Model::UploadPartOutcome>& result) { + std::unique_lock<std::mutex> lock(state->mutex); + if (!result.ok()) { + state->status &= result.status(); + } else { + const auto& outcome = *result; + if (!outcome.IsSuccess()) { + state->status &= UploadPartError(req, outcome); + } else { + AddCompletedPart(state, part_number, outcome.GetResult()); + } + } + // Notify completion + if (--state->parts_in_progress == 0) { + state->cv.notify_all(); + } + } + static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number, const S3Model::UploadPartResult& result) { S3Model::CompletedPart part; @@ -1059,8 +1069,7 @@ class ObjectOutputStream final : public io::OutputStream { } protected: - std::shared_ptr<FileSystem> fs_; // Owner of S3Client - Aws::S3::S3Client* client_; + std::shared_ptr<Aws::S3::S3Client> client_; const io::IOContext io_context_; S3Path path_; const S3Options& options_; @@ -1081,8 +1090,6 @@ class ObjectOutputStream final : public io::OutputStream { Aws::Vector<S3Model::CompletedPart> completed_parts; int64_t parts_in_progress = 0; Status status; - - UploadState() : status(Status::OK()) {} }; std::shared_ptr<UploadState> upload_state_; }; @@ -1106,7 +1113,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>; using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>; - Aws::S3::S3Client* client_; + std::shared_ptr<Aws::S3::S3Client> client_; + io::IOContext io_context_; const std::string bucket_; const std::string base_dir_; const int32_t max_keys_; @@ -1120,10 +1128,12 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { return self->DoWalk(); } - TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir, - int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler, + TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext io_context, + std::string bucket, std::string base_dir, int32_t max_keys, + ResultHandler result_handler, ErrorHandler error_handler, RecursionHandler recursion_handler) : client_(std::move(client)), + io_context_(io_context), bucket_(std::move(bucket)), base_dir_(std::move(base_dir)), max_keys_(max_keys), @@ -1159,23 +1169,41 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { int32_t nesting_depth; S3Model::ListObjectsV2Request req; - void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&, - const S3Model::ListObjectsV2Outcome& outcome, - const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) { + void operator()(const Result<S3Model::ListObjectsV2Outcome>& result) { // Serialize calls to operation-specific handlers std::unique_lock<std::mutex> guard(walker->mutex_); if (walker->is_finished()) { // Early exit: avoid executing handlers if DoWalk() returned return; } + if (!result.ok()) { + HandleError(result.status()); + return; + } + const auto& outcome = *result; if (!outcome.IsSuccess()) { Status st = walker->error_handler_(outcome.GetError()); - walker->ListObjectsFinished(std::move(st)); + HandleError(std::move(st)); return; } HandleResult(outcome.GetResult()); } + void SpawnListObjectsV2() { + auto walker = this->walker; + auto req = this->req; + auto maybe_fut = walker->io_context_.executor()->Submit( + walker->io_context_.stop_token(), + [walker, req]() { return walker->client_->ListObjectsV2(req); }); + if (!maybe_fut.ok()) { + HandleError(maybe_fut.status()); + return; + } + maybe_fut->AddCallback(*this); + } + + void HandleError(Status status) { walker->ListObjectsFinished(std::move(status)); } + void HandleResult(const S3Model::ListObjectsV2Result& result) { bool recurse = result.GetCommonPrefixes().size() > 0; if (recurse) { @@ -1199,7 +1227,7 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { if (result.GetIsTruncated()) { DCHECK(!result.GetNextContinuationToken().empty()); req.SetContinuationToken(result.GetNextContinuationToken()); - walker->client_->ListObjectsV2Async(req, *this); + SpawnListObjectsV2(); } else { walker->ListObjectsFinished(Status::OK()); } @@ -1212,7 +1240,7 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { } req.SetDelimiter(Aws::String() + kSep); req.SetMaxKeys(walker->max_keys_); - walker->client_->ListObjectsV2Async(req, *this); + SpawnListObjectsV2(); } }; @@ -1241,7 +1269,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> { class S3FileSystem::Impl { public: ClientBuilder builder_; - std::unique_ptr<Aws::S3::S3Client> client_; + io::IOContext io_context_; + std::shared_ptr<Aws::S3::S3Client> client_; util::optional<S3Backend> backend_; const int32_t kListObjectsMaxKeys = 1000; @@ -1250,7 +1279,8 @@ class S3FileSystem::Impl { // Limit recursing depth, since a recursion bomb can be created const int32_t kMaxNestingDepth = 100; - explicit Impl(S3Options options) : builder_(std::move(options)) {} + explicit Impl(S3Options options, io::IOContext io_context) + : builder_(std::move(options)), io_context_(io_context) {} Status Init() { return builder_.BuildClient().Value(&client_); } @@ -1442,7 +1472,7 @@ class S3FileSystem::Impl { return Status::OK(); }; - RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys, + RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys, handle_results, handle_error, handle_recursion)); // If no contents were found, perhaps it's an empty "directory", @@ -1487,22 +1517,23 @@ class S3FileSystem::Impl { return true; // Recurse }; - return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys, + return TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys, handle_results, handle_error, handle_recursion); } // Delete multiple objects at once - Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) { - struct DeleteHandler { - Future<> future = Future<>::Make(); - - // Callback for DeleteObjectsAsync - void operator()(const Aws::S3::S3Client*, const S3Model::DeleteObjectsRequest& req, - const S3Model::DeleteObjectsOutcome& outcome, - const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) { + Future<> DeleteObjectsAsync(const std::string& bucket, + const std::vector<std::string>& keys) { + struct DeleteCallback { + const std::string bucket; + + Status operator()(const Result<S3Model::DeleteObjectsOutcome>& result) { + if (!result.ok()) { + return result.status(); + } + const auto& outcome = *result; if (!outcome.IsSuccess()) { - future.MarkFinished(ErrorToStatus(outcome.GetError())); - return; + return ErrorToStatus(outcome.GetError()); } // Also need to check per-key errors, even on successful outcome // See @@ -1511,23 +1542,22 @@ class S3FileSystem::Impl { if (!errors.empty()) { std::stringstream ss; ss << "Got the following " << errors.size() - << " errors when deleting objects in S3 bucket '" << req.GetBucket() - << "':\n"; + << " errors when deleting objects in S3 bucket '" << bucket << "':\n"; for (const auto& error : errors) { ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n"; } - future.MarkFinished(Status::IOError(ss.str())); - } else { - future.MarkFinished(); + return Status::IOError(ss.str()); } + return Status::OK(); } }; const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys); - std::vector<DeleteHandler> delete_handlers; - std::vector<Future<>*> futures; - delete_handlers.reserve(keys.size() / chunk_size + 1); - futures.reserve(delete_handlers.capacity()); + DeleteCallback delete_cb{bucket}; + auto client = client_; + + std::vector<Future<>> futures; + futures.reserve(keys.size() / chunk_size + 1); for (size_t start = 0; start < keys.size(); start += chunk_size) { S3Model::DeleteObjectsRequest req; @@ -1537,16 +1567,18 @@ class S3FileSystem::Impl { } req.SetBucket(ToAwsString(bucket)); req.SetDelete(std::move(del)); - delete_handlers.emplace_back(); - futures.push_back(&delete_handlers.back().future); - client_->DeleteObjectsAsync(req, delete_handlers.back()); + ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit( + io_context_.stop_token(), [client, req]() { + return client->DeleteObjects(req); + })); + futures.push_back(std::move(fut).Then(delete_cb)); } - WaitForAll(futures); - for (const auto* fut : futures) { - RETURN_NOT_OK(fut->status()); - } - return Status::OK(); + return AllComplete(futures); + } + + Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) { + return DeleteObjectsAsync(bucket, keys).status(); } Status DeleteDirContents(const std::string& bucket, const std::string& key) { @@ -1601,8 +1633,7 @@ class S3FileSystem::Impl { ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(), - fs->io_context(), path); + auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -1619,15 +1650,15 @@ class S3FileSystem::Impl { ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(), - fs->io_context(), path, info.size()); + auto ptr = + std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size()); RETURN_NOT_OK(ptr->Init()); return ptr; } }; S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context) - : FileSystem(io_context), impl_(new Impl{options}) { + : FileSystem(io_context), impl_(new Impl{options, io_context}) { default_async_is_sync_ = false; } @@ -1908,8 +1939,8 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream( ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = std::make_shared<ObjectOutputStream>( - shared_from_this(), impl_->client_.get(), io_context(), path, impl_->options()); + auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path, + impl_->options()); RETURN_NOT_OK(ptr->Init()); return ptr; } diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 99e6b3f..c79d9f7 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -464,10 +464,10 @@ class TestS3FS : public S3TestMixin { std::weak_ptr<S3FileSystem> weak_fs(fs_); ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5")); fs_.reset(); - ASSERT_FALSE(weak_fs.expired()); - ASSERT_OK(stream->Write("some data")); + ASSERT_OK(stream->Write("some other data")); ASSERT_OK(stream->Close()); ASSERT_TRUE(weak_fs.expired()); + AssertObjectContents(client_.get(), "bucket", "newfile5", "some other data"); } void TestOpenOutputStreamAbort() { @@ -802,7 +802,6 @@ TEST_F(TestS3FS, OpenInputStream) { std::weak_ptr<S3FileSystem> weak_fs(fs_); ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile")); fs_.reset(); - ASSERT_FALSE(weak_fs.expired()); ASSERT_OK_AND_ASSIGN(buf, stream->Read(10)); AssertBufferEqual(*buf, "some data"); ASSERT_OK(stream->Close()); diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 4b867ea..501d5ca 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -348,6 +348,10 @@ Future<> AllComplete(const std::vector<Future<>>& futures) { std::atomic<size_t> n_remaining; }; + if (futures.empty()) { + return Future<>::MakeFinished(); + } + auto state = std::make_shared<State>(futures.size()); auto out = Future<>::Make(); for (const auto& future : futures) { diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index fc63a42..4ede291 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -628,6 +628,10 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) { std::atomic<size_t> n_remaining; }; + if (futures.size() == 0) { + return {std::vector<Result<T>>{}}; + } + auto state = std::make_shared<State>(std::move(futures)); auto out = Future<std::vector<Result<T>>>::Make(); diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 0bd8380..0fc202e 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -971,6 +971,13 @@ TEST(FutureCompletionTest, FutureVoid) { } } +TEST(FutureAllTest, Empty) { + auto combined = arrow::All(std::vector<Future<int>>{}); + auto after_assert = combined.Then( + [](std::vector<Result<int>> results) { ASSERT_EQ(0, results.size()); }); + AssertSuccessful(after_assert); +} + TEST(FutureAllTest, Simple) { auto f1 = Future<int>::Make(); auto f2 = Future<int>::Make(); @@ -1012,11 +1019,16 @@ TEST(FutureAllTest, Failure) { AssertFinished(after_assert); } +TEST(FutureAllCompleteTest, Empty) { + Future<> combined = AllComplete(std::vector<Future<>>{}); + AssertSuccessful(combined); +} + TEST(FutureAllCompleteTest, Simple) { auto f1 = Future<int>::Make(); auto f2 = Future<int>::Make(); std::vector<Future<>> futures = {Future<>(f1), Future<>(f2)}; - auto combined = arrow::AllComplete(futures); + auto combined = AllComplete(futures); AssertNotFinished(combined); f2.MarkFinished(2); AssertNotFinished(combined); @@ -1029,7 +1041,7 @@ TEST(FutureAllCompleteTest, Failure) { auto f2 = Future<int>::Make(); auto f3 = Future<int>::Make(); std::vector<Future<>> futures = {Future<>(f1), Future<>(f2), Future<>(f3)}; - auto combined = arrow::AllComplete(futures); + auto combined = AllComplete(futures); AssertNotFinished(combined); f1.MarkFinished(1); AssertNotFinished(combined);