[arrow] 01/01: Show http status codes for S3 errors
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch s3-errors-http-codes in repository https://gitbox.apache.org/repos/asf/arrow.git commit 792a5b16df01486e826e52274447c2d9f4232a7b Author: Philipp Moritz AuthorDate: Thu Sep 1 01:46:45 2022 -0700 Show http status codes for S3 errors --- cpp/src/arrow/filesystem/s3_internal.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3_internal.h b/cpp/src/arrow/filesystem/s3_internal.h index 0943037aef..a0e45ed8ad 100644 --- a/cpp/src/arrow/filesystem/s3_internal.h +++ b/cpp/src/arrow/filesystem/s3_internal.h @@ -154,8 +154,9 @@ Status ErrorToStatus(const std::string& prefix, const std::string& operation, // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371 return Status::IOError( prefix, "AWS Error ", - S3ErrorToString(static_cast(error.GetErrorType())), " during ", - operation, " operation: ", error.GetMessage()); + S3ErrorToString(static_cast(error.GetErrorType())), + " (http status code: ", static_cast(error.GetResponseCode()), ")", + " during ", operation, " operation: ", error.GetMessage()); } template
[arrow] branch s3-errors-http-codes created (now 792a5b16df)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a change to branch s3-errors-http-codes in repository https://gitbox.apache.org/repos/asf/arrow.git at 792a5b16df Show http status codes for S3 errors This branch includes the following new commits: new 792a5b16df Show http status codes for S3 errors The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[arrow] branch master updated (13a7b605ed -> 46f38dca3d)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git from 13a7b605ed ARROW-17573: [Go][Parquet] ByteArray statistics can cause memory leak (#14013) add 46f38dca3d ARROW-17079: [C++] Raise proper error message instead of error code for S3 errors (#14001) No new revisions were added by this update. Summary of changes: cpp/src/arrow/filesystem/s3_internal.h | 63 -- 1 file changed, 60 insertions(+), 3 deletions(-)
[arrow] branch master updated: ARROW-17079: [C++] Improve error messages for AWS S3 calls (#13979)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new b43c6f6b18 ARROW-17079: [C++] Improve error messages for AWS S3 calls (#13979) b43c6f6b18 is described below commit b43c6f6b18acee019a500bc6a05af18c07f2ca79 Author: Philipp Moritz AuthorDate: Mon Aug 29 20:46:34 2022 -0700 ARROW-17079: [C++] Improve error messages for AWS S3 calls (#13979) First part of improving error messages from S3 operations. We include the operation that failed in the error message now. This makes it easier to debug problems (e.g. bucket permissions or other infrastructure hickups) because it allows to re-run the operation that failed with the AWS CLI for further diagnosing what is going wrong. Example (changes in bold): > When getting information for key 'test.csv' in bucket 'pcmoritz-test-bucket-arrow-errors': AWS Error [code 15] **during HeadObject operation**: No response body. Authored-by: Philipp Moritz Signed-off-by: Philipp Moritz --- cpp/src/arrow/filesystem/s3_internal.h | 46 +++- cpp/src/arrow/filesystem/s3fs.cc | 48 +++--- cpp/src/arrow/filesystem/s3fs_benchmark.cc | 6 ++-- cpp/src/arrow/filesystem/s3fs_test.cc | 14 - 4 files changed, 52 insertions(+), 62 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3_internal.h b/cpp/src/arrow/filesystem/s3_internal.h index ceb92b5548..ae938c1760 100644 --- a/cpp/src/arrow/filesystem/s3_internal.h +++ b/cpp/src/arrow/filesystem/s3_internal.h @@ -39,19 +39,6 @@ namespace arrow { namespace fs { namespace internal { -#define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \ - auto outcome_name = (rexpr); \ - if (!outcome_name.IsSuccess()) { \ -return ErrorToStatus(outcome_name.GetError()); \ - } \ - lhs = std::move(outcome_name).GetResultWithOwnership(); - -#define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y) - -#define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \ - ARROW_AWS_ASSIGN_OR_RAISE_IMPL( \ - ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr); - // XXX Should we expose this at some point? enum class S3Backend { Amazon, Minio, Other }; @@ -104,60 +91,63 @@ inline bool IsAlreadyExists(const Aws::Client::AWSError& erro // TODO qualify error messages with a prefix indicating context // (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...") template -Status ErrorToStatus(const std::string& prefix, +Status ErrorToStatus(const std::string& prefix, const std::string& operation, const Aws::Client::AWSError& error) { // XXX Handle fine-grained error types // See // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371 return Status::IOError(prefix, "AWS Error [code ", - static_cast(error.GetErrorType()), - "]: ", error.GetMessage()); + static_cast(error.GetErrorType()), "] during ", operation, + " operation: ", error.GetMessage()); } template -Status ErrorToStatus(const std::tuple& prefix, +Status ErrorToStatus(const std::tuple& prefix, const std::string& operation, const Aws::Client::AWSError& error) { std::stringstream ss; ::arrow::internal::PrintTuple(, prefix); - return ErrorToStatus(ss.str(), error); + return ErrorToStatus(ss.str(), operation, error); } template -Status ErrorToStatus(const Aws::Client::AWSError& error) { - return ErrorToStatus(std::string(), error); +Status ErrorToStatus(const std::string& operation, + const Aws::Client::AWSError& error) { + return ErrorToStatus(std::string(), operation, error); } template -Status OutcomeToStatus(const std::string& prefix, +Status OutcomeToStatus(const std::string& prefix, const std::string& operation, const Aws::Utils::Outcome& outcome) { if (outcome.IsSuccess()) { return Status::OK(); } else { -return ErrorToStatus(prefix, outcome.GetError()); +return ErrorToStatus(prefix, operation, outcome.GetError()); } } template -Status OutcomeToStatus(const std::tuple& prefix, +Status OutcomeToStatus(const std::tuple& prefix, const std::string& operation, const Aws::Utils::Outcome& outcome) { if (outcome.IsSuccess()) { return Status::OK(); } else { -return ErrorToStatus(prefix, outcome.GetError()); +
[arrow] branch master updated: ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new af45b92 ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object af45b92 is described below commit af45b9212156980f55c399e2e88b4e19b4bb8ec1 Author: Stephanie Wang AuthorDate: Sun Mar 8 19:25:55 2020 -0700 ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object Allow the client to pass in a flag during object creation specifying whether objects should be evicted or not. Closes #6520 from stephanie-wang/try-evict and squashes the following commits: 9a9dc1a5e Merge branch 'master' into try-evict 9e8c08f56 fix 2f38969a3 Merge remote-tracking branch 'upstream/master' into try-evict a32ab9b83 Default evict_if_full arg 9ddc881df document arg c2ba17c15 fix pyx 48e70bf76 Fix cpp 62b2f636a fix tests 288927472 protocol ecef91564 Add flag to evict if full Authored-by: Stephanie Wang Signed-off-by: Philipp Moritz --- cpp/src/plasma/client.cc | 43 ++--- cpp/src/plasma/client.h| 14 -- cpp/src/plasma/plasma.fbs | 6 +++ cpp/src/plasma/plasma_generated.h | 75 +- cpp/src/plasma/protocol.cc | 27 +++ cpp/src/plasma/protocol.h | 15 +++--- cpp/src/plasma/store.cc| 75 ++ cpp/src/plasma/store.h | 14 -- cpp/src/plasma/test/serialization_tests.cc | 9 ++-- python/pyarrow/_plasma.pyx | 10 ++-- 10 files changed, 197 insertions(+), 91 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 0640091..1fbb9d4 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -225,14 +225,16 @@ class PlasmaClient::Impl : public std::enable_shared_from_this* data, int device_num = 0); +int64_t metadata_size, std::shared_ptr* data, int device_num = 0, +bool evict_if_full = true); Status CreateAndSeal(const ObjectID& object_id, const std::string& data, - const std::string& metadata); + const std::string& metadata, bool evict_if_full = true); Status CreateAndSealBatch(const std::vector& object_ids, const std::vector& data, -const std::vector& metadata); +const std::vector& metadata, +bool evict_if_full = true); Status Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* object_buffers); @@ -416,13 +418,14 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, - std::shared_ptr* data, int device_num) { + std::shared_ptr* data, int device_num, + bool evict_if_full) { std::lock_guard guard(client_mutex_); ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; - RETURN_NOT_OK( - SendCreateRequest(store_conn_, object_id, data_size, metadata_size, device_num)); + RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size, + metadata_size, device_num)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, )); ObjectID id; @@ -485,7 +488,8 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id, const std::string& data, - const std::string& metadata) { + const std::string& metadata, + bool evict_if_full) { std::lock_guard guard(client_mutex_); ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; @@ -496,7 +500,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id, reinterpret_cast(metadata.data()), metadata.size()); memcpy([0], , sizeof(hash)); - RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, metadata, digest)); + RETURN_NOT_OK(SendCreateAndSea
[arrow] branch master updated: ARROW-7998: [C++][Plasma] Make Seal requests synchronous
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 66b05ab ARROW-7998: [C++][Plasma] Make Seal requests synchronous 66b05ab is described below commit 66b05abc267661172286b47b9246ad55f1581555 Author: Stephanie Wang AuthorDate: Thu Mar 5 11:38:21 2020 -0800 ARROW-7998: [C++][Plasma] Make Seal requests synchronous When handling a `Seal` request to create an object and make it visible to other clients, the plasma store does not wait until the seal is complete before responding to the requesting client. This makes the interface hard to use, since the client is not guaranteed that the object is visible yet and would have to use an additional IPC round-trip to determine when the object is ready. This improvement would require the plasma store to wait until the object has been created before responding to the client. Closes #6529 from stephanie-wang/sync-seal and squashes the following commits: cb3867e4f Make Seal synchronous Authored-by: Stephanie Wang Signed-off-by: Philipp Moritz --- cpp/src/plasma/client.cc | 5 + cpp/src/plasma/store.cc | 10 ++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d6c5d7b..0640091 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -859,6 +859,11 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { RETURN_NOT_OK(Hash(object_id, [0])); RETURN_NOT_OK( SendSealRequest(store_conn_, object_id, std::string(digest.begin(), digest.end(; + std::vector buffer; + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, )); + ObjectID sealed_id; + RETURN_NOT_OK(ReadSealReply(buffer.data(), buffer.size(), _id)); + ARROW_CHECK(sealed_id == object_id); // We call PlasmaClient::Release to decrement the number of instances of this // object // that are currently being used by this client. The corresponding increment diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 253250e..d02765f 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -959,8 +959,6 @@ Status PlasmaStore::ProcessMessage(Client* client) { int device_num = 0; PlasmaError error_code = CreateObject(object_id, data.size(), metadata.size(), device_num, client, ); - // Reply to the client. - HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd); // If the object was successfully created, fill out the object data and seal it. if (error_code == PlasmaError::OK) { @@ -976,6 +974,9 @@ Status PlasmaStore::ProcessMessage(Client* client) { // Release call that happens in the client's Seal method. ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); } + + // Reply to the client. + HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd); } break; case fb::MessageType::PlasmaCreateAndSealBatchRequest: { std::vector object_ids; @@ -999,8 +1000,6 @@ Status PlasmaStore::ProcessMessage(Client* client) { } } - HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code), client->fd); - // if OK, seal all the objects, // if error, abort the previous i objects immediately if (error_code == PlasmaError::OK) { @@ -1027,6 +1026,8 @@ Status PlasmaStore::ProcessMessage(Client* client) { AbortObject(object_ids[j], client); } } + + HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code), client->fd); } break; case fb::MessageType::PlasmaAbortRequest: { RETURN_NOT_OK(ReadAbortRequest(input, input_size, _id)); @@ -1071,6 +1072,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { std::string digest; RETURN_NOT_OK(ReadSealRequest(input, input_size, _id, )); SealObjects({object_id}, {digest}); + HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd); } break; case fb::MessageType::PlasmaEvictRequest: { // This code path should only be used for testing.
[arrow] branch master updated (6f62a2c -> 86f34aa)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 6f62a2c ARROW-7003: [Rust] Generate flatbuffers files in docker build image add 86f34aa ARROW-7004: [Plasma] Make it possible to bump up object in LRU cache No new revisions were added by this update. Summary of changes: cpp/src/plasma/client.cc | 16 ++ cpp/src/plasma/client.h | 7 + cpp/src/plasma/common_generated.h| 16 +- cpp/src/plasma/eviction_policy.cc| 19 +- cpp/src/plasma/eviction_policy.h | 4 +- cpp/src/plasma/plasma.fbs| 17 +- cpp/src/plasma/plasma_generated.h| 539 +++ cpp/src/plasma/protocol.cc | 36 +++ cpp/src/plasma/protocol.h| 11 + cpp/src/plasma/quota_aware_policy.cc | 10 + cpp/src/plasma/quota_aware_policy.h | 1 + cpp/src/plasma/store.cc | 6 + cpp/src/plasma/test/client_tests.cc | 47 ++- 13 files changed, 527 insertions(+), 202 deletions(-)
[arrow] branch master updated (2bf344a -> 61a1c12)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 2bf344a ARROW-6963: [Packaging][Wheel][OSX] Use crossbow's command to deploy artifacts from travis builds add 61a1c12 ARROW-6907: [Plasma] Allow Plasma to send batched notifications. No new revisions were added by this update. Summary of changes: cpp/src/plasma/client.cc | 85 +- cpp/src/plasma/client.h| 5 +- cpp/src/plasma/plasma.cc | 18 ++ cpp/src/plasma/plasma.fbs | 4 ++ cpp/src/plasma/plasma.h| 4 ++ cpp/src/plasma/plasma_generated.h | 93 cpp/src/plasma/protocol.cc | 20 +++--- cpp/src/plasma/protocol.h | 17 +- cpp/src/plasma/store.cc| 97 ++ cpp/src/plasma/store.h | 12 ++-- cpp/src/plasma/test/client_tests.cc| 38 cpp/src/plasma/test/serialization_tests.cc | 11 ++-- python/pyarrow/_plasma.pyx | 40 ++-- python/pyarrow/tests/test_plasma.py| 10 +-- 14 files changed, 344 insertions(+), 110 deletions(-)
[arrow] branch master updated (4db8f7b -> 0aad5a0)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 4db8f7b ARROW-6812: [Java] Fix License header add 0aad5a0 ARROW-6824: [Plasma] Allow creation of multiple objects through a single IPC in Plasma Store No new revisions were added by this update. Summary of changes: cpp/src/arrow/vendored/xxhash/xxhash.c | 2 +- cpp/src/plasma/client.cc | 41 +++ cpp/src/plasma/client.h| 11 + cpp/src/plasma/common_generated.h | 16 +- cpp/src/plasma/plasma.fbs | 14 + cpp/src/plasma/plasma_generated.h | 555 +++-- cpp/src/plasma/protocol.cc | 75 + cpp/src/plasma/protocol.h | 15 + cpp/src/plasma/store.cc| 48 +++ cpp/src/plasma/test/client_tests.cc| 24 ++ 10 files changed, 634 insertions(+), 167 deletions(-)
[arrow] branch master updated: ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory error
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new f976629 ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory error f976629 is described below commit f976629a54f5518f6285a311c45c5957281b1ee7 Author: Richard Liaw AuthorDate: Tue Jul 16 00:30:50 2019 -0700 ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory error If the client tries to call `CreateObject` and there is not enough memory left in the object store to create it, an `OutOfMemory` error will be returned. However, the plasma store also creates an entry for the object, even though it failed to be created. This means that later on, if the client tries to create the object again, it will receive an error that the object already exists. Also, if the client tries to get the object, it will hang because the entry appears to be unsealed. We fix this by only creating the object entry if the `CreateObject` operation succeeds. Author: Richard Liaw Author: Philipp Moritz Closes #4850 from richardliaw/fix_all_plasma_bugs_once_and_for_all and squashes the following commits: 82c4701ca Update test_plasma.py 5ac61b8f3 updatetest c6b97b345 Merge branch 'fix_all_plasma_bugs_once_and_for_all' of https://github.com/richardliaw/arrow into fix_all_plasma_bugs_once_and_for_all 475d12d5c fix up tests f5b892ff0 Update cpp/src/plasma/store.cc ff1c826c4 store fix --- cpp/src/plasma/store.cc | 38 + python/pyarrow/tests/test_plasma.py | 10 +++--- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c574d09..2c3361e 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -215,10 +215,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // ignore this requst. return PlasmaError::ObjectExists; } - auto ptr = std::unique_ptr(new ObjectTableEntry()); - entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); - entry->data_size = data_size; - entry->metadata_size = metadata_size; int fd = -1; int64_t map_size = 0; @@ -226,29 +222,35 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si uint8_t* pointer = nullptr; auto total_size = data_size + metadata_size; - if (device_num != 0) { + if (device_num == 0) { +pointer = AllocateMemory(total_size, , _size, ); +if (!pointer) { + ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() + << ", data_size=" << data_size + << ", metadata_size=" << metadata_size + << ", will send a reply of PlasmaError::OutOfMemory"; + return PlasmaError::OutOfMemory; +} + } else { #ifdef PLASMA_CUDA -auto st = AllocateCudaMemory(device_num, total_size, , >ipc_handle); +/// IPC GPU handle to share with clients. +std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; +auto st = AllocateCudaMemory(device_num, total_size, , _handle); if (!st.ok()) { ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); return PlasmaError::OutOfMemory; } -result->ipc_handle = entry->ipc_handle; +result->ipc_handle = ipc_handle; #else ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; return PlasmaError::OutOfMemory; #endif - } else { -pointer = AllocateMemory(total_size, , _size, ); -if (!pointer) { - ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() - << ", data_size=" << data_size - << ", metadata_size=" << metadata_size - << ", will send a reply of PlasmaError::OutOfMemory"; - return PlasmaError::OutOfMemory; -} } + auto ptr = std::unique_ptr(new ObjectTableEntry()); + entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); + entry->data_size = data_size; + entry->metadata_size = metadata_size; entry->pointer = pointer; // TODO(pcm): Set the other fields. entry->fd = fd; @@ -259,6 +261,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si entry->create_time = std::time(nullptr); entry->construct_duration = -1; +#ifdef PLASMA_CUDA + entry->ipc_handle = result->ipc_handle; +#endif + result-&g
[arrow] branch master updated: ARROW-5904: [Java] [Plasma] Fix compilation of Plasma Java client
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new d0d9ece ARROW-5904: [Java] [Plasma] Fix compilation of Plasma Java client d0d9ece is described below commit d0d9ecec33413f7ef6c7f91448a802666ad5f871 Author: Philipp Moritz AuthorDate: Thu Jul 11 11:43:32 2019 -0700 ARROW-5904: [Java] [Plasma] Fix compilation of Plasma Java client Author: Philipp Moritz Closes #4849 from pcmoritz/plasma-status-fix and squashes the following commits: c596c12b3 fix plasma status message --- cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index 248c268..0964df4 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -111,13 +111,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( std::shared_ptr data; Status s = client->Create(oid, size, md, md_size, ); - if (s.IsPlasmaObjectExists()) { + if (plasma::IsPlasmaObjectExists(s)) { jclass exceptionClass = env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException"); env->ThrowNew(exceptionClass, oid.hex().c_str()); return nullptr; } - if (s.IsPlasmaStoreFull()) { + if (plasma::IsPlasmaStoreFull(s)) { jclass exceptionClass = env->FindClass("org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException"); env->ThrowNew(exceptionClass, "");
[arrow] branch master updated: ARROW-5283: [C++][Plasma] Erase object id in client when abort object
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 49714fb ARROW-5283: [C++][Plasma] Erase object id in client when abort object 49714fb is described below commit 49714fb5a329f77072381ff6827b7296f688dd05 Author: shengjun.li AuthorDate: Tue May 28 19:06:03 2019 -0700 ARROW-5283: [C++][Plasma] Erase object id in client when abort object Author: shengjun.li Closes #4272 from shengjun1985/master and squashes the following commits: 126a60e35 Add a comment. f7113c2cc ARROW-5283: erase object id in client when abort object --- cpp/src/plasma/store.cc | 4 +++- cpp/src/plasma/test/client_tests.cc | 32 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 680714f..c574d09 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -604,6 +604,7 @@ int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) { } else { // The client requesting the abort is the creator. Free the object. EraseFromObjectTable(object_id); +client->object_ids.erase(it); return 1; } } @@ -727,7 +728,8 @@ void PlasmaStore::DisconnectClient(int client_fd) { sealed_objects[it->first] = it->second.get(); } else { // Abort unsealed object. - AbortObject(it->first, client); + // Don't call AbortObject() because client->object_ids would be modified. + EraseFromObjectTable(object_id); } } diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 92eadea..dbe3b9f 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -387,6 +387,38 @@ TEST_F(TestPlasmaStore, AbortTest) { AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5}); } +TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { + const int64_t loop_times = 5; + + ObjectID object_id = random_object_id(); + std::vector object_buffers; + + // Test for object non-existence. + ARROW_CHECK_OK(client_.Get({object_id}, 0, _buffers)); + ASSERT_FALSE(object_buffers[0].data); + + int64_t data_size = 20; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + + // Test the sequence: create -> release -> abort -> ... + for (int64_t i = 0; i < loop_times; i++) { +std::shared_ptr data; +ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, )); +ARROW_CHECK_OK(client_.Release(object_id)); +ARROW_CHECK_OK(client_.Abort(object_id)); + } + + // Test the sequence: create -> seal -> release -> delete -> ... + for (int64_t i = 0; i < loop_times; i++) { +std::shared_ptr data; +ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, )); +ARROW_CHECK_OK(client_.Seal(object_id)); +ARROW_CHECK_OK(client_.Release(object_id)); +ARROW_CHECK_OK(client_.Delete(object_id)); + } +} + TEST_F(TestPlasmaStore, MultipleClientTest) { ObjectID object_id = random_object_id(); std::vector object_buffers;
[arrow] branch master updated: ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 15c86e6 ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory 15c86e6 is described below commit 15c86e6cfecef0aa742b446bd7cdcc1ed1e0b180 Author: shengjun.li AuthorDate: Mon Apr 29 14:58:31 2019 -0700 ARROW-5186 [Plasma] Fix crash caused by improper free on CUDA memory Author: shengjun.li Closes #4177 from shengjun1985/master and squashes the following commits: f384a9419 ARROW-5186 Fix crash caused by improper free on CUDA memory 48e0fda9e ARROW-5186 fix carsh on delete gpu memory --- cpp/src/arrow/gpu/cuda_context.h| 7 +- cpp/src/plasma/store.cc | 16 - cpp/src/plasma/store.h | 2 ++ cpp/src/plasma/test/client_tests.cc | 46 + 4 files changed, 69 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 682cbd8..99c3fc2 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -133,6 +133,12 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this& dst_ctx, void* dst, const void* src, int64_t nbytes); - Status Free(void* device_ptr, int64_t nbytes); class CudaContextImpl; std::unique_ptr impl_; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 4d86653..680714f 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -194,6 +194,13 @@ Status PlasmaStore::AllocateCudaMemory( // The IPC handle will keep the buffer memory alive return cuda_buffer->ExportForIpc(out_ipc_handle); } + +Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) { + std::shared_ptr context_; + RETURN_NOT_OK(manager_->GetContext(device_num - 1, _)); + RETURN_NOT_OK(context_->Free(pointer, size)); + return Status::OK(); +} #endif // Create a new object buffer in the hash table. @@ -532,7 +539,14 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) { auto& object = store_info_.objects[object_id]; - PlasmaAllocator::Free(object->pointer, object->data_size + object->metadata_size); + auto buff_size = object->data_size + object->metadata_size; + if (object->device_num == 0) { +PlasmaAllocator::Free(object->pointer, buff_size); + } else { +#ifdef PLASMA_CUDA +ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer)); +#endif + } store_info_.objects.erase(object_id); } diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 53464ab..26b49f0 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -218,6 +218,8 @@ class PlasmaStore { #ifdef PLASMA_CUDA Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer, std::shared_ptr* out_ipc_handle); + + Status FreeCudaMemory(int device_num, int64_t size, uint8_t* out_pointer); #endif /// Event loop of the plasma store. diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 4dd0c06..fd91b32 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -538,6 +538,52 @@ TEST_F(TestPlasmaStore, GetGPUTest) { AssertCudaRead(object_buffers[0].metadata, {42}); } +TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) { + ObjectID object_id1 = random_object_id(); + ObjectID object_id2 = random_object_id(); + + // Test for deleting non-existance object. + Status result = client_.Delete(std::vector{object_id1, object_id2}); + ARROW_CHECK_OK(result); + // Test for the object being in local Plasma store. + // First create object. + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + std::shared_ptr data; + ARROW_CHECK_OK( + client_.Create(object_id1, data_size, metadata, metadata_size, , 1)); + ARROW_CHECK_OK(client_.Seal(object_id1)); + ARROW_CHECK_OK( + client_.Create(object_id2, data_size, metadata, metadata_size, , 1)); + ARROW_CHECK_OK(client_.Seal(object_id2)); + // Release the ref count of Create function. + ARROW_CHECK_OK(client_.Release(object_id1)); + ARROW_CHECK_OK(client_.Release(object_id2)); + // Increase the ref count by calling Get using client2_. + std::vector object_buffers; + ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, _buffers)); + // Objects are still used by client2_. + result = client_.Delete(std::vector{object_id1, object_id2}); + ARROW_CHECK_OK(result); + // The object is used and it should not be deleted right now. + bool has_object =
[arrow] branch master updated: ARROW-4939: [Python] Add wrapper for "sum" kernel
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new c1926f7 ARROW-4939: [Python] Add wrapper for "sum" kernel c1926f7 is described below commit c1926f7be8b08026f9df426396b98c11662abd95 Author: Philipp Moritz AuthorDate: Tue Mar 26 23:13:22 2019 -0700 ARROW-4939: [Python] Add wrapper for "sum" kernel Author: Philipp Moritz Closes #3954 from pcmoritz/python-sum and squashes the following commits: c177c9e47 update f967acad6 use pytest.mark.parametrize 6c5d2df44 cleanup b3658bbaf update dedb324b6 update 21049a8ce update 0e868c42a update ef6decd23 fix b414dc45f add doc and test b77589d04 add more scalar types 0d4a9ede5 update c2a468d3f update 30df4e955 update 9b8f4ae29 add debugging 429923e7f add scalar wrappers 94a17cc3a fix 293e6c1e1 add python wrapper for sum kernel --- python/pyarrow/array.pxi | 13 +++ python/pyarrow/includes/libarrow.pxd | 36 +++ python/pyarrow/lib.pxd | 5 + python/pyarrow/public-api.pxi| 14 +++ python/pyarrow/scalar.pxi| 182 ++- python/pyarrow/tests/test_compute.py | 36 +++ 6 files changed, 284 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 4824db6..1966245 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -253,6 +253,8 @@ cdef wrap_datum(const CDatum& datum): return pyarrow_wrap_array(MakeArray(datum.array())) elif datum.kind() == DatumType_CHUNKED_ARRAY: return pyarrow_wrap_chunked_array(datum.chunked_array()) +elif datum.kind() == DatumType_SCALAR: +return pyarrow_wrap_scalar(datum.scalar()) else: raise ValueError("Unable to wrap Datum in a Python object") @@ -470,6 +472,17 @@ cdef class Array(_PandasConvertible): return pyarrow_wrap_array(result) +def sum(self): +""" +Sum the values in a numerical array. +""" +cdef CDatum out + +with nogil: +check_status(Sum(_context(), CDatum(self.sp_array), )) + +return wrap_datum(out) + def unique(self): """ Compute distinct elements in array diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 8382221..06e7d43 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -568,6 +568,39 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: Type type_id() c_bool Equals(const CTensor& other) +cdef cppclass CScalar" arrow::Scalar": +shared_ptr[CDataType] type + +cdef cppclass CInt8Scalar" arrow::Int8Scalar"(CScalar): +int8_t value + +cdef cppclass CUInt8Scalar" arrow::UInt8Scalar"(CScalar): +uint8_t value + +cdef cppclass CInt16Scalar" arrow::Int16Scalar"(CScalar): +int16_t value + +cdef cppclass CUInt16Scalar" arrow::UInt16Scalar"(CScalar): +uint16_t value + +cdef cppclass CInt32Scalar" arrow::Int32Scalar"(CScalar): +int32_t value + +cdef cppclass CUInt32Scalar" arrow::UInt32Scalar"(CScalar): +uint32_t value + +cdef cppclass CInt64Scalar" arrow::Int64Scalar"(CScalar): +int64_t value + +cdef cppclass CUInt64Scalar" arrow::UInt64Scalar"(CScalar): +uint64_t value + +cdef cppclass CFloatScalar" arrow::FloatScalar"(CScalar): +float value + +cdef cppclass CDoubleScalar" arrow::DoubleScalar"(CScalar): +double value + CStatus ConcatenateTables(const vector[shared_ptr[CTable]]& tables, shared_ptr[CTable]* result) @@ -1030,6 +1063,7 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: shared_ptr[CArrayData] array() shared_ptr[CChunkedArray] chunked_array() +shared_ptr[CScalar] scalar() CStatus Cast(CFunctionContext* context, const CArray& array, const shared_ptr[CDataType]& to_type, @@ -1046,6 +1080,8 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: CStatus DictionaryEncode(CFunctionContext* context, const CDatum& value, CDatum* out) +CStatus Sum(CFunctionContext* context, const CDatum& value, CDatum* out) + cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: shared_ptr[CDataType] GetPrimitiveType(Ty
[arrow] branch master updated: ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to external store
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 0c55b25 ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to external store 0c55b25 is described below commit 0c55b25c84119af59320eab0b0625da9ce987294 Author: Anurag Khandelwal AuthorDate: Mon Feb 4 18:24:13 2019 -0800 ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to external store https://issues.apache.org/jira/browse/ARROW-4294 Note: this PR was previously at https://github.com/apache/arrow/pull/3432, which was closed since its commit history was broken. Currently, when Plasma needs storage space for additional objects, it evicts objects by deleting them from the Plasma store. This is a problem when it isn't possible to reconstruct the object or reconstructing it is expensive. This patch adds support for a pluggable external store that Plasma can evict objects to when it runs out of memory. Author: Anurag Khandelwal Author: Philipp Moritz Closes #3482 from anuragkh/plasma_evict_to_external_store and squashes the following commits: 631671561 remove external store worker, simplify interface 6fbc55b08 Revert "Add an eviction buffer to allow asynchronous evictions" 4f2c02ce3 Revert "Minor fix" 1bc1dbed4 Revert "format fix" 7b662bee4 Revert "Remove timeout for external store test tearDown" 25663df30 Remove timeout for external store test tearDown 7945cc951 format fix 0d7263936 Minor fix 957efb5f0 Add an eviction buffer to allow asynchronous evictions 896d895bd Fixes 7ae486794 Merge branch 'master' into plasma_evict_to_external_store 1af2f8bce Fix cpplint issues 04e173085 Merge branch 'master' into plasma_evict_to_external_store 301e575ea Fix uses of ARROW_CHECK_OK/ARROW_CHECK 69a56abcc Fix documentation errrors c19c5767d Add documentation for notify flag f3fad8086 Fix external store worker intialization 9081596c4 Clean up formatting issues f5cc95c72 Add lint exclusion for external_store_worker, since it uses mutex ffd1f0e6c Extend plasma eviction changes to python module 8afc9fb2f Kill only the plasma_store_server that we started be315677b Add test for testing evictions/unevictions a43445aee Update serialization test 58a995318 Add support for evicting/un-evicting Plasma objects to/from external store --- cpp/src/plasma/CMakeLists.txt | 7 +- cpp/src/plasma/common.h | 4 +- cpp/src/plasma/external_store.cc| 63 + cpp/src/plasma/external_store.h | 123 cpp/src/plasma/hash_table_store.cc | 58 cpp/src/plasma/hash_table_store.h | 53 +++ cpp/src/plasma/store.cc | 210 +--- cpp/src/plasma/store.h | 23 ++- cpp/src/plasma/test/client_tests.cc | 15 +- cpp/src/plasma/test/external_store_tests.cc | 139 ++ python/pyarrow/_plasma.pyx | 1 + python/pyarrow/plasma.py| 6 +- python/pyarrow/tests/test_plasma.py | 58 13 files changed, 692 insertions(+), 68 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 53af8c5..fd25aef 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -125,9 +125,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "gcc") " -Wno-conversion") endif() +list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" "hash_table_store.cc") + # We use static libraries for the plasma_store_server executable so that it can # be copied around and used in different locations. -add_executable(plasma_store_server store.cc) +add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc) target_link_libraries(plasma_store_server plasma_static ${PLASMA_STATIC_LINK_LIBS}) add_dependencies(plasma plasma_store_server) @@ -214,3 +216,6 @@ ADD_PLASMA_TEST(test/serialization_tests ADD_PLASMA_TEST(test/client_tests EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS} EXTRA_DEPENDENCIES plasma_store_server) +ADD_PLASMA_TEST(test/external_store_tests + EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS} + EXTRA_DEPENDENCIES plasma_store_server) diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index dfbd90c..6f4cef5 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -69,7 +69,9 @@ enum class ObjectState : int { /// Object was created but not sealed in the local Plasma Store. PLASMA_CREATED = 1, /// Object is sealed and stored in the local Plasm
[arrow] branch master updated: ARROW-4455: [Plasma] Suppress class-memaccess warnings
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new f08e109 ARROW-4455: [Plasma] Suppress class-memaccess warnings f08e109 is described below commit f08e109ff009e911bb772e5b0490c7cacf02140e Author: Kouhei Sutou AuthorDate: Fri Feb 1 16:12:12 2019 -0800 ARROW-4455: [Plasma] Suppress class-memaccess warnings cpp/src/plasma/store.cc: In member function 'arrow::Status plasma::PlasmaStore::ProcessMessage(plasma::Client*)': cpp/src/plasma/store.cc:767:36: error: 'void* memset(void*, int, size_t)' clearing an object of type 'struct plasma::PlasmaObject' with no trivial copy-assignment; use assignment or value-initialization instead [-Werror=class-memaccess] memset(, 0, sizeof(object)); ^ In file included from cpp/src/plasma/eviction_policy.h:27, from cpp/src/plasma/store.h:30, from cpp/src/plasma/store.cc:29: cpp/src/plasma/plasma.h:75:8: note: 'struct plasma::PlasmaObject' declared here struct PlasmaObject { ^~~~ cpp/src/plasma/test/serialization_tests.cc: In function 'plasma::PlasmaObject plasma::random_plasma_object()': cpp/src/plasma/test/serialization_tests.cc:68:36: error: 'void* memset(void*, int, size_t)' clearing an object of type 'struct plasma::PlasmaObject' with no trivial copy-assignment; use assignment or value-initialization instead [-Werror=class-memaccess] memset(, 0, sizeof(object)); ^ In file included from cpp/src/plasma/test/serialization_tests.cc:25: cpp/src/plasma/plasma.h:75:8: note: 'struct plasma::PlasmaObject' declared here struct PlasmaObject { ^~~~ cpp/src/plasma/test/serialization_tests.cc: In member function 'virtual void plasma::PlasmaSerialization_CreateReply_Test::TestBody()': cpp/src/plasma/test/serialization_tests.cc:110:38: error: 'void* memset(void*, int, size_t)' clearing an object of type 'struct plasma::PlasmaObject' with no trivial copy-assignment; use assignment or value-initialization instead [-Werror=class-memaccess] memset(, 0, sizeof(object2)); ^ In file included from cpp/src/plasma/test/serialization_tests.cc:25: cpp/src/plasma/plasma.h:75:8: note: 'struct plasma::PlasmaObject' declared here struct PlasmaObject { ^~~~ Author: Kouhei Sutou Closes #3543 from kou/plasma-suppress-class-memaccess-warning and squashes the following commits: 34ce66c0c Suppress class-memaccess warnings --- cpp/src/plasma/store.cc| 4 +--- cpp/src/plasma/test/serialization_tests.cc | 6 ++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index f84e890..745e336 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -762,9 +762,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { uint8_t* input = input_buffer_.data(); size_t input_size = input_buffer_.size(); ObjectID object_id; - PlasmaObject object; - // TODO(pcm): Get rid of the following. - memset(, 0, sizeof(object)); + PlasmaObject object = {}; // Process the different types of requests. switch (type) { diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 66d651d..4fb3f9a 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -64,8 +64,7 @@ std::vector read_message_from_file(int fd, MessageType message_type) { PlasmaObject random_plasma_object(void) { unsigned int seed = static_cast(time(NULL)); int random = rand_r(); - PlasmaObject object; - memset(, 0, sizeof(object)); + PlasmaObject object = {}; object.store_fd = random + 7; object.data_offset = random + 1; object.metadata_offset = random + 2; @@ -106,8 +105,7 @@ TEST(PlasmaSerialization, CreateReply) { ARROW_CHECK_OK(SendCreateReply(fd, object_id1, , PlasmaError::OK, mmap_size1)); std::vector data = read_message_from_file(fd, MessageType::PlasmaCreateReply); ObjectID object_id2; - PlasmaObject object2; - memset(, 0, sizeof(object2)); + PlasmaObject object2 = {}; int store_fd; int64_t mmap_size2; ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), _id2, ,
[arrow] branch master updated: ARROW-4422: [Plasma] Enforce memory limit in plasma, rather than relying on dlmalloc_set_footprint_limit
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 7379281 ARROW-4422: [Plasma] Enforce memory limit in plasma, rather than relying on dlmalloc_set_footprint_limit 7379281 is described below commit 73792817a039a16c179cbb2716ab2cf0c918f77e Author: Anurag Khandelwal AuthorDate: Wed Jan 30 17:44:57 2019 -0800 ARROW-4422: [Plasma] Enforce memory limit in plasma, rather than relying on dlmalloc_set_footprint_limit https://issues.apache.org/jira/browse/ARROW-4422 This implementation imposes memory limit at Plasma by tracking the number of bytes allocated and freed using malloc and free calls. Whenever the allocation reaches the set limit, we fail any subsequent allocations (by returning NULL from malloc). This allows Plasma to not be tied to dlmalloc, and also provides more accurate tracking of memory allocation/capacity. cc @pcmoritz Author: Anurag Khandelwal Closes #3526 from anuragkh/plasma_track_memory_use and squashes the following commits: 182e8a401 Replace dlmalloc calls with PlasmaAllocator, which internally calls dlmalloc --- cpp/src/plasma/CMakeLists.txt | 1 + cpp/src/plasma/eviction_policy.cc | 23 +- cpp/src/plasma/eviction_policy.h | 2 -- cpp/src/plasma/plasma.cc | 7 ++--- cpp/src/plasma/plasma.h| 3 -- cpp/src/plasma/plasma_allocator.cc | 56 + cpp/src/plasma/plasma_allocator.h | 64 ++ cpp/src/plasma/store.cc| 45 ++- cpp/src/plasma/store.h | 3 +- 9 files changed, 148 insertions(+), 56 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 5660664..53af8c5 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -76,6 +76,7 @@ set(PLASMA_SRCS io.cc malloc.cc plasma.cc + plasma_allocator.cc protocol.cc thirdparty/ae/ae.c) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index 4fb0cce..da5df5a 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -16,6 +16,7 @@ // under the License. #include "plasma/eviction_policy.h" +#include "plasma/plasma_allocator.h" #include @@ -48,8 +49,7 @@ int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required, return bytes_evicted; } -EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info) -: memory_used_(0), store_info_(store_info) {} +EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info) : store_info_(store_info) {} int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required, std::vector* objects_to_evict) { @@ -59,33 +59,29 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required, for (auto& object_id : *objects_to_evict) { cache_.Remove(object_id); } - // Update the number of bytes used. - memory_used_ -= bytes_evicted; - ARROW_CHECK(memory_used_ >= 0); return bytes_evicted; } void EvictionPolicy::ObjectCreated(const ObjectID& object_id) { auto entry = store_info_->objects[object_id].get(); cache_.Add(object_id, entry->data_size + entry->metadata_size); - int64_t size = entry->data_size + entry->metadata_size; - memory_used_ += size; - ARROW_CHECK(memory_used_ <= store_info_->memory_capacity); } bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_to_evict) { // Check if there is enough space to create the object. - int64_t required_space = memory_used_ + size - store_info_->memory_capacity; + int64_t required_space = + PlasmaAllocator::Allocated() + size - PlasmaAllocator::GetFootprintLimit(); // Try to free up at least as much space as we need right now but ideally // up to 20% of the total capacity. - int64_t space_to_free = std::max(required_space, store_info_->memory_capacity / 5); + int64_t space_to_free = + std::max(required_space, PlasmaAllocator::GetFootprintLimit() / 5); ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects"; // Choose some objects to evict, and update the return pointers. int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict); ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " << objects_to_evict->size() << " objects to free up " << num_bytes_evicted << " bytes. The number of bytes in use (before " - << "this eviction) is " << (memory_used_
[arrow] branch master updated: ARROW-4379: [Python] Register serializers for collections.Counter and collections.deque.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 74fb9da ARROW-4379: [Python] Register serializers for collections.Counter and collections.deque. 74fb9da is described below commit 74fb9da2737a8f0c670a014213b8fec4366e88ca Author: Robert Nishihara AuthorDate: Sun Jan 27 17:19:27 2019 -0800 ARROW-4379: [Python] Register serializers for collections.Counter and collections.deque. Author: Robert Nishihara Closes #3489 from robertnishihara/serializationcollections and squashes the following commits: f08a21756 Register serializers for collections.Counter and collections.deque. --- python/pyarrow/serialization.py| 65 +- python/pyarrow/tests/test_serialization.py | 14 --- 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py index 9b261c1..96785de 100644 --- a/python/pyarrow/serialization.py +++ b/python/pyarrow/serialization.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from collections import OrderedDict, defaultdict +import collections import six import sys @@ -227,33 +227,26 @@ def register_torch_serialization_handlers(serialization_context): pass -def register_default_serialization_handlers(serialization_context): +def _register_collections_serialization_handlers(serialization_context): +def _serialize_deque(obj): +return list(obj) -# -- -# Set up serialization for primitive datatypes +def _deserialize_deque(data): +return collections.deque(data) -# TODO(pcm): This is currently a workaround until arrow supports -# arbitrary precision integers. This is only called on long integers, -# see the associated case in the append method in python_to_arrow.cc serialization_context.register_type( -int, "int", -custom_serializer=lambda obj: str(obj), -custom_deserializer=lambda data: int(data)) - -if (sys.version_info < (3, 0)): -serialization_context.register_type( -long, "long", # noqa: F821 -custom_serializer=lambda obj: str(obj), -custom_deserializer=lambda data: long(data)) # noqa: F821 +collections.deque, "collections.deque", +custom_serializer=_serialize_deque, +custom_deserializer=_deserialize_deque) def _serialize_ordered_dict(obj): return list(obj.keys()), list(obj.values()) def _deserialize_ordered_dict(data): -return OrderedDict(zip(data[0], data[1])) +return collections.OrderedDict(zip(data[0], data[1])) serialization_context.register_type( -OrderedDict, "OrderedDict", +collections.OrderedDict, "collections.OrderedDict", custom_serializer=_serialize_ordered_dict, custom_deserializer=_deserialize_ordered_dict) @@ -261,13 +254,44 @@ def register_default_serialization_handlers(serialization_context): return list(obj.keys()), list(obj.values()), obj.default_factory def _deserialize_default_dict(data): -return defaultdict(data[2], zip(data[0], data[1])) +return collections.defaultdict(data[2], zip(data[0], data[1])) serialization_context.register_type( -defaultdict, "defaultdict", +collections.defaultdict, "collections.defaultdict", custom_serializer=_serialize_default_dict, custom_deserializer=_deserialize_default_dict) +def _serialize_counter(obj): +return list(obj.keys()), list(obj.values()) + +def _deserialize_counter(data): +return collections.Counter(dict(zip(data[0], data[1]))) + +serialization_context.register_type( +collections.Counter, "collections.Counter", +custom_serializer=_serialize_counter, +custom_deserializer=_deserialize_counter) + + +def register_default_serialization_handlers(serialization_context): + +# -- +# Set up serialization for primitive datatypes + +# TODO(pcm): This is currently a workaround until arrow supports +# arbitrary precision integers. This is only called on long integers, +# see the associated case in the append method in python_to_arrow.cc +serialization_context.register_type( +int, "int", +custom_serializer=lambda obj: str(obj), +custom_deserializer=lambda data: int(data)) + +if (sys.version_info < (3, 0)): +serialization_context.register_type( +long, "long", # noqa: F8
[arrow] branch master updated: ARROW-4236: [java] Distinct plasma client create exceptions
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 3405cd4 ARROW-4236: [java] Distinct plasma client create exceptions 3405cd4 is described below commit 3405cd43477f6724c3784cf97237862ec09409e0 Author: yl187661 AuthorDate: Wed Jan 23 22:07:49 2019 -0800 ARROW-4236: [java] Distinct plasma client create exceptions when ray puts an object in plasma store, there are 2 exceptions may be thrown, one is "An object with this ID already exists in the plasma store" and the other is "The plasma store ran out of memory and could not create this object", distinct them rather than let them both be the same java class @raulchen please help review Author: yl187661 Author: lynn Closes #3306 from bibabolynn/dev_plasmaClientException and squashes the following commits: 3e512b63c add assert 88e1702ba cpp lint 5586036ac cpp lint c4fe54fd3 cpp lint 10ff110f4 plasmaClientTest catch duplicate object exception acc7c0669 indentation 5699eff38 blank line 4710d5940 fix f3d12a6e5 distinct plasma client create exception --- .../org_apache_arrow_plasma_PlasmaClientJNI.cc | 12 - .../org/apache/arrow/plasma/ObjectStoreLink.java | 6 - .../java/org/apache/arrow/plasma/PlasmaClient.java | 18 -- .../org/apache/arrow/plasma/PlasmaClientJNI.java | 6 - .../exceptions/DuplicateObjectException.java | 29 ++ .../exceptions/PlasmaOutOfMemoryException.java | 29 ++ .../org/apache/arrow/plasma/PlasmaClientTest.java | 11 ++-- 7 files changed, 88 insertions(+), 23 deletions(-) diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index d552994..1988742 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -104,15 +104,15 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( std::shared_ptr data; Status s = client->Create(oid, size, md, md_size, ); if (s.IsPlasmaObjectExists()) { -jclass Exception = env->FindClass("java/lang/Exception"); -env->ThrowNew(Exception, - "An object with this ID already exists in the plasma store."); +jclass exceptionClass = + env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException"); +env->ThrowNew(exceptionClass, oid.hex().c_str()); return nullptr; } if (s.IsPlasmaStoreFull()) { -jclass Exception = env->FindClass("java/lang/Exception"); -env->ThrowNew(Exception, - "The plasma store ran out of memory and could not create this object."); +jclass exceptionClass = + env->FindClass("org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException"); +env->ThrowNew(exceptionClass, ""); return nullptr; } ARROW_CHECK(s.ok()); diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java index 8d6eec0..f933c85 100644 --- a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java @@ -19,6 +19,9 @@ package org.apache.arrow.plasma; import java.util.List; +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException; + /** * Object store interface, which provides the capabilities to put and get raw byte array, and serves. */ @@ -42,7 +45,8 @@ public interface ObjectStoreLink { * @param value The value to put in the object store. * @param metadata encodes whatever metadata the user wishes to encode. */ - void put(byte[] objectId, byte[] value, byte[] metadata); + void put(byte[] objectId, byte[] value, byte[] metadata) + throws DuplicateObjectException, PlasmaOutOfMemoryException; /** * Get a buffer from the PlasmaStore based on the objectId. diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java index d69b54d..a708f41 100644 --- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java @@ -19,9 +19,10 @@ package org.apache.arrow.plasma; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import org.apache.ar
[arrow] branch master updated: ARROW-4295: [C++] [Plasma] Fix incorrect log message
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new cc1ce61 ARROW-4295: [C++] [Plasma] Fix incorrect log message cc1ce61 is described below commit cc1ce6194b905768b1a6d9f0e209270f62dc558a Author: Anurag Khandelwal AuthorDate: Sat Jan 19 13:35:10 2019 -0800 ARROW-4295: [C++] [Plasma] Fix incorrect log message https://issues.apache.org/jira/browse/ARROW-4295 Fixes incorrect message printed when evicting objects in plasma. Author: Anurag Khandelwal Closes #3433 from anuragkh/ARROW-4295 and squashes the following commits: 8c52725e8 Fix incorrect log message --- cpp/src/plasma/eviction_policy.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index e5beb5a..4fb0cce 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -85,7 +85,7 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_t ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " << objects_to_evict->size() << " objects to free up " << num_bytes_evicted << " bytes. The number of bytes in use (before " - << "this eviction) is " << memory_used_ << "."; + << "this eviction) is " << (memory_used_ + num_bytes_evicted) << "."; return num_bytes_evicted >= required_space && num_bytes_evicted > 0; }
[arrow] branch master updated: ARROW-4015: [Plasma] remove unused interfaces for plasma manager
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 8c41303 ARROW-4015: [Plasma] remove unused interfaces for plasma manager 8c41303 is described below commit 8c413036775796d9bcc52be56373bbb45de8c0ae Author: Philipp Moritz AuthorDate: Fri Dec 14 07:27:08 2018 -0800 ARROW-4015: [Plasma] remove unused interfaces for plasma manager https://github.com/apache/arrow/issues/3154 This removes unused plasma interfaces Fetch(), Wait(), Transfer() and Info(), which depend on plasma manager which has already been removed from ray. Author: Philipp Moritz Author: Zhijun Fu Author: Robert Nishihara Closes #3167 from zhijunfu/remove-legacy-interfaces and squashes the following commits: 0efb5005f fix tensorflow op be92e9085 fix java client 9da2cd38b Update _plasma.pyx 16ec63e9a More updates e7413f739 Update _plasma.pyx 21398b5e7 merge bcb320400 address comments 7967aea09 Merge branch 'master' into remove-legacy-interfaces 583cd97c4 ARROW-4015: remove unused interfaces for plasma manager --- c_glib/plasma-glib/client.cpp | 3 +- cpp/apidoc/tutorials/plasma.md | 8 +- cpp/apidoc/tutorials/tensor_to_py.md | 2 +- cpp/src/plasma/client.cc | 111 +--- cpp/src/plasma/client.h| 100 +- cpp/src/plasma/common.cc | 3 - cpp/src/plasma/common.h| 24 cpp/src/plasma/format/plasma.fbs | 74 --- .../org_apache_arrow_plasma_PlasmaClientJNI.cc | 73 --- cpp/src/plasma/plasma.h| 3 - cpp/src/plasma/protocol.cc | 143 - cpp/src/plasma/protocol.h | 35 - cpp/src/plasma/test/client_tests.cc| 2 - cpp/src/plasma/test/serialization_tests.cc | 116 - docs/source/python/plasma.rst | 10 +- .../org/apache/arrow/plasma/ObjectStoreLink.java | 27 .../java/org/apache/arrow/plasma/PlasmaClient.java | 23 python/benchmarks/plasma.py| 4 +- python/examples/plasma/sorting/sort_df.py | 2 +- python/pyarrow/_plasma.pyx | 130 +-- python/pyarrow/tensorflow/plasma_op.cc | 18 +-- python/pyarrow/tests/test_plasma.py| 16 +-- python/pyarrow/tests/test_plasma_tf_op.py | 8 +- 23 files changed, 41 insertions(+), 894 deletions(-) diff --git a/c_glib/plasma-glib/client.cpp b/c_glib/plasma-glib/client.cpp index c05a710..9591a0a 100644 --- a/c_glib/plasma-glib/client.cpp +++ b/c_glib/plasma-glib/client.cpp @@ -41,8 +41,7 @@ G_BEGIN_DECLS * * #GPlasmaClientCreateOptions is a class for customizing object creation. * - * #GPlasmaClient is a class for an interface with a plasma store - * and a plasma manager. + * #GPlasmaClient is a class for an interface with a plasma store. * * Since: 0.12.0 */ diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md index 472d479..b9046d5 100644 --- a/cpp/apidoc/tutorials/plasma.md +++ b/cpp/apidoc/tutorials/plasma.md @@ -80,7 +80,7 @@ using namespace plasma; int main(int argc, char** argv) { // Start up and connect a Plasma client. PlasmaClient client; - ARROW_CHECK_OK(client.Connect("/tmp/plasma", "")); + ARROW_CHECK_OK(client.Connect("/tmp/plasma")); // Disconnect the Plasma client. ARROW_CHECK_OK(client.Disconnect()); } @@ -226,7 +226,7 @@ using namespace plasma; int main(int argc, char** argv) { // Start up and connect a Plasma client. PlasmaClient client; - ARROW_CHECK_OK(client.Connect("/tmp/plasma", "")); + ARROW_CHECK_OK(client.Connect("/tmp/plasma")); // Create an object with a fixed ObjectID. ObjectID object_id = ObjectID::from_binary(""); int64_t data_size = 1000; @@ -332,7 +332,7 @@ using namespace plasma; int main(int argc, char** argv) { // Start up and connect a Plasma client. PlasmaClient client; - ARROW_CHECK_OK(client.Connect("/tmp/plasma", "")); + ARROW_CHECK_OK(client.Connect("/tmp/plasma")); ObjectID object_id = ObjectID::from_binary(""); ObjectBuffer object_buffer; ARROW_CHECK_OK(client.Get(_id, 1, -1, _buffer)); @@ -421,7 +421,7 @@ using namespace plasma; int main(int argc, char** argv) { // Start up and connect a Plasma client. PlasmaClient client; - ARROW_CHECK_OK(client.Connect("/tmp/plasma", "")); + AR
[arrow] branch master updated: ARROW-3950: [Plasma] Make loading the TensorFlow op optional
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 4d810b4 ARROW-3950: [Plasma] Make loading the TensorFlow op optional 4d810b4 is described below commit 4d810b4a9c37e79fde6b134ac90ee0c5f7f6c9bf Author: Philipp Moritz AuthorDate: Fri Dec 7 18:27:27 2018 -0800 ARROW-3950: [Plasma] Make loading the TensorFlow op optional Author: Philipp Moritz Closes #3117 from pcmoritz/tf-optional-loading and squashes the following commits: 0404e7ede fix 2b0d25432 make loading the tensorflow op optional --- python/pyarrow/plasma.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py index fbca9d5..056172c 100644 --- a/python/pyarrow/plasma.py +++ b/python/pyarrow/plasma.py @@ -39,7 +39,9 @@ TF_PLASMA_OP_PATH = os.path.join(pa.__path__[0], "tensorflow", "plasma_op.so") tf_plasma_op = None -if os.path.exists(TF_PLASMA_OP_PATH): + +def load_plasma_tensorflow_op(): +global tf_plasma_op import tensorflow as tf tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH)
[arrow] branch master updated: ARROW-2759: [Plasma] Export plasma notification socket
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 6045dd2 ARROW-2759: [Plasma] Export plasma notification socket 6045dd2 is described below commit 6045dd2a66507b0858cf7679745da5e785548f88 Author: suquark AuthorDate: Mon Dec 3 14:34:04 2018 -0800 ARROW-2759: [Plasma] Export plasma notification socket https://issues.apache.org/jira/browse/ARROW-2759 Author: suquark Author: Philipp Moritz Closes #3008 from suquark/plasma and squashes the following commits: 0bc89eba5 Update _plasma.pyx a6d598159 fix a bug. style f037c319e fix imports 19dae23d0 use compat 942a62f84 fix test 37b656013 fix test 19f6b9852 py2 compatibility 8e636ebaf py2 compatibility bbf07b94d lint 6005b34a5 lint 2719d0dc1 lint 048f0c2ab Export plasma notification socket. --- cpp/src/plasma/client.cc| 28 --- cpp/src/plasma/client.h | 3 +++ python/pyarrow/_plasma.pyx | 38 + python/pyarrow/compat.py| 10 ++ python/pyarrow/tests/test_plasma.py | 29 5 files changed, 101 insertions(+), 7 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 0c96be0..20dc421 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -202,6 +202,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this(notification.get()); +Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, + int64_t* data_size, + int64_t* metadata_size) { + auto object_info = flatbuffers::GetRoot(buffer); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID)); if (object_info->is_deletion()) { @@ -962,6 +962,15 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, return Status::OK(); } +Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size) { + auto notification = ReadMessageAsync(fd); + if (notification == NULL) { +return Status::IOError("Failed to read object notification from Plasma socket"); + } + return DecodeNotification(notification.get(), object_id, data_size, metadata_size); +} + Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, const std::string& manager_socket_name, int release_delay, int num_retries) { @@ -1138,6 +1147,11 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ return impl_->GetNotification(fd, object_id, data_size, metadata_size); } +Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, +int64_t* data_size, int64_t* metadata_size) { + return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size); +} + Status PlasmaClient::Disconnect() { return impl_->Disconnect(); } Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) { diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 1ad09f5..9e080b7 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -246,6 +246,9 @@ class ARROW_EXPORT PlasmaClient { Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id, +int64_t* data_size, int64_t* metadata_size); + /// Disconnect from the local plasma instance, including the local store and /// manager. /// diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 677e768..2fad09c 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -32,11 +32,13 @@ from cpython.pycapsule cimport * import collections import pyarrow import random +import socket from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, CFixedSizeBufferWriter, CStatus) +from pyarrow import compat PLASMA_WAIT_TIMEOUT = 2 ** 30 @@ -131,6 +133,10 @@ cdef extern from "plasma/client.h" nogil: CStatus Subscribe(int* fd) +CStatus DecodeNotification(const uint8_t* buffer, + CUniqueID* object_id, int64_t* data_size, +
[arrow] branch master updated: ARROW-3199: [Plasma] File descriptor send and receive retries
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 67d9264 ARROW-3199: [Plasma] File descriptor send and receive retries 67d9264 is described below commit 67d92642900ec29a41fa67b720e5b77bfa755e2d Author: Philipp Moritz AuthorDate: Mon Dec 3 14:28:49 2018 -0800 ARROW-3199: [Plasma] File descriptor send and receive retries An additional piece of eyes would be appreciated for this. It seems to solve our issue reported in the JIRA, but I'm not sure what the semantics of partial reads/writes is here (in particular, how are partial read/writes handled for ancillary data like file descriptors?). found by cc @stephanie-wang Author: Philipp Moritz Closes #2551 from pcmoritz/plasma-retries and squashes the following commits: c7ca3b700 fix 91061adc5 linting fda4dd27d move retry code 5dbabd79a fix travis bc876dc29 fix compile errors e1580fe81 plasma file descriptor send and receive retries --- cpp/src/plasma/fling.cc | 47 +-- cpp/src/plasma/store.cc | 17 + 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc index 26afd87..f0960aa 100644 --- a/cpp/src/plasma/fling.cc +++ b/cpp/src/plasma/fling.cc @@ -16,6 +16,8 @@ #include +#include "arrow/util/logging.h" + void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) { iov->iov_base = buf; iov->iov_len = 1; @@ -46,11 +48,32 @@ int send_fd(int conn, int fd) { memcpy(CMSG_DATA(header), reinterpret_cast(), sizeof(int)); // Send file descriptor. - ssize_t r = sendmsg(conn, , 0); - if (r >= 0) { -return 0; - } else { -return static_cast(r); + while (true) { +ssize_t r = sendmsg(conn, , 0); +if (r < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { +continue; + } else if (errno == EMSGSIZE) { +ARROW_LOG(WARNING) << "Failed to send file descriptor" + << " (errno = EMSGSIZE), retrying."; +// If we failed to send the file descriptor, loop until we have sent it +// successfully. TODO(rkn): This is problematic for two reasons. First +// of all, sending the file descriptor should just succeed without any +// errors, but sometimes I see a "Message too long" error number. +// Second, looping like this allows a client to potentially block the +// plasma store event loop which should never happen. +continue; + } else { +ARROW_LOG(INFO) << "Error in send_fd (errno = " << errno << ")"; +return static_cast(r); + } +} else if (r == 0) { + ARROW_LOG(INFO) << "Encountered unexpected EOF"; + return 0; +} else { + ARROW_CHECK(r > 0); + return static_cast(r); +} } } @@ -60,7 +83,19 @@ int recv_fd(int conn) { char buf[CMSG_SPACE(sizeof(int))]; init_msg(, , buf, sizeof(buf)); - if (recvmsg(conn, , 0) == -1) return -1; + while (true) { +ssize_t r = recvmsg(conn, , 0); +if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { +continue; + } else { +ARROW_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")"; +return -1; + } +} else { + break; +} + } int found_fd = -1; int oh_noes = 0; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 28624d0..bb99f59 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -327,22 +327,7 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) { if (s.ok()) { // Send all of the file descriptors for the present objects. for (int store_fd : store_fds) { - int error_code = send_fd(get_req->client->fd, store_fd); - // If we failed to send the file descriptor, loop until we have sent it - // successfully. TODO(rkn): This is problematic for two reasons. First - // of all, sending the file descriptor should just succeed without any - // errors, but sometimes I see a "Message too long" error number. - // Second, looping like this allows a client to potentially block the - // plasma store event loop which should never happen. - while (error_code < 0) { -if (errno == EMSGSIZE) { - ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying."; - error_code = send_fd(get_req->client->fd, store_fd); - continue; -} -WarnIfSigpipe(error_code, get_req->client->fd); -break; - } + WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); } }
[arrow] branch master updated: ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new a667fca ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator. a667fca is described below commit a667fca3b71772886bb2595986266d2039823dcc Author: Robert Nishihara AuthorDate: Sat Dec 1 00:26:51 2018 -0800 ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator. There is an issue here where `Release` was never being called in the plasma TF operator. Note that I also changed the release delay in the plasma operator to 0. Author: Robert Nishihara Author: Philipp Moritz Closes #3061 from robertnishihara/extrareleaseinplasmaop and squashes the following commits: c10956692 add include guards f89d5df8c lint 4836342e0 unregister memory e3b3864ef Linting b948ce0f4 Add test. 75f2bd99c Remove logging statement. f04a7d26d Fix 574c03532 Fix ndarray/tensor confusion in plasma op. 06985cd1c Have plasma op deserialize as numpy array. a2a9c36b3 Add release call into wrapped_callback. 0db9154bd Change release delay to 0. f4340946c Add Release call in plasma op. --- cpp/src/arrow/python/deserialize.cc | 17 +-- cpp/src/arrow/python/deserialize.h| 6 ++-- cpp/src/arrow/python/serialize.cc | 14 - cpp/src/arrow/python/serialize.h | 6 ++-- python/pyarrow/tensorflow/plasma_op.cc| 48 --- python/pyarrow/tests/test_plasma_tf_op.py | 5 +++- 6 files changed, 58 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index 452d8dd..f1a7eab 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu ipc::Message message(metadata, body); -RETURN_NOT_OK(ReadTensor(message, )); +RETURN_NOT_OK(ipc::ReadTensor(message, )); out->tensors.emplace_back(std::move(tensor)); } @@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu ipc::Message message(metadata, body); -RETURN_NOT_OK(ReadTensor(message, )); +RETURN_NOT_OK(ipc::ReadTensor(message, )); out->ndarrays.emplace_back(std::move(tensor)); } @@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu return Status::OK(); } -Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr* out) { - if (object.tensors.size() != 1) { -return Status::Invalid("Object is not a Tensor"); +Status DeserializeNdarray(const SerializedPyObject& object, + std::shared_ptr* out) { + if (object.ndarrays.size() != 1) { +return Status::Invalid("Object is not an Ndarray"); } - *out = object.tensors[0]; + *out = object.ndarrays[0]; return Status::OK(); } -Status ReadTensor(std::shared_ptr src, std::shared_ptr* out) { +Status NdarrayFromBuffer(std::shared_ptr src, std::shared_ptr* out) { io::BufferReader reader(src); SerializedPyObject object; RETURN_NOT_OK(ReadSerializedObject(, )); - return DeserializeTensor(object, out); + return DeserializeNdarray(object, out); } } // namespace py diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h index a0286b1..754765a 100644 --- a/cpp/src/arrow/python/deserialize.h +++ b/cpp/src/arrow/python/deserialize.h @@ -76,15 +76,15 @@ ARROW_EXPORT Status DeserializeObject(PyObject* context, const SerializedPyObject& object, PyObject* base, PyObject** out); -/// \brief Reconstruct Tensor from Arrow-serialized representation +/// \brief Reconstruct Ndarray from Arrow-serialized representation /// \param[in] object Object to deserialize /// \param[out] out The deserialized tensor /// \return Status ARROW_EXPORT -Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr* out); +Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr* out); ARROW_EXPORT -Status ReadTensor(std::shared_ptr src, std::shared_ptr* out); +Status NdarrayFromBuffer(std::shared_ptr src, std::shared_ptr* out); } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index 2655280..7911557 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject return Status::OK(); } -Status SerializeTensor(std::shared_ptr tensor, SerializedPyObject* o
[arrow] branch master updated: ARROW-3765: [Gandiva] Segfault when the validity bitmap has not been allocated
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 5874af5 ARROW-3765: [Gandiva] Segfault when the validity bitmap has not been allocated 5874af5 is described below commit 5874af553f035244713b687b50e57dce81204433 Author: suquark AuthorDate: Thu Nov 15 22:10:27 2018 -0800 ARROW-3765: [Gandiva] Segfault when the validity bitmap has not been allocated https://issues.apache.org/jira/browse/ARROW-3765 Author: suquark Closes #2967 from suquark/gandiva and squashes the following commits: 6d09068d0 lint 4b3ea9d32 lint 76b7e7f1e fix bug efff64a4c combine tests to reduce build time 5e4dda518 lint b509b0573 rename test 4e2528bdb lint bdf08f9ff fix bugs & add new tests de2061330 Gandiva null validity buffer support. --- cpp/src/gandiva/annotator.cc | 12 ++ cpp/src/gandiva/bitmap_accumulator.h | 4 +++- cpp/src/gandiva/tests/filter_test.cc | 46 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/cpp/src/gandiva/annotator.cc b/cpp/src/gandiva/annotator.cc index 0fe9fc8..3c8585c 100644 --- a/cpp/src/gandiva/annotator.cc +++ b/cpp/src/gandiva/annotator.cc @@ -59,11 +59,13 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc, EvalBatch* eval_batch) { int buffer_idx = 0; - // TODO: - // - validity is optional - - uint8_t* validity_buf = const_cast(array_data.buffers[buffer_idx]->data()); - eval_batch->SetBuffer(desc.validity_idx(), validity_buf); + // The validity buffer is optional. Use nullptr if it does not have one. + if (array_data.buffers[buffer_idx]) { +uint8_t* validity_buf = const_cast(array_data.buffers[buffer_idx]->data()); +eval_batch->SetBuffer(desc.validity_idx(), validity_buf); + } else { +eval_batch->SetBuffer(desc.validity_idx(), nullptr); + } ++buffer_idx; if (desc.HasOffsetsIdx()) { diff --git a/cpp/src/gandiva/bitmap_accumulator.h b/cpp/src/gandiva/bitmap_accumulator.h index 31b6609..157405d 100644 --- a/cpp/src/gandiva/bitmap_accumulator.h +++ b/cpp/src/gandiva/bitmap_accumulator.h @@ -20,6 +20,7 @@ #include +#include "arrow/util/macros.h" #include "gandiva/dex.h" #include "gandiva/dex_visitor.h" #include "gandiva/eval_batch.h" @@ -36,7 +37,8 @@ class BitMapAccumulator : public DexDefaultVisitor { void Visit(const VectorReadValidityDex& dex) { int idx = dex.ValidityIdx(); auto bitmap = eval_batch_.GetBuffer(idx); -src_maps_.push_back(bitmap); +// The bitmap could be null. Ignore it in this case. +if (bitmap != NULLPTR) src_maps_.push_back(bitmap); } void Visit(const LocalBitMapValidityDex& dex) { diff --git a/cpp/src/gandiva/tests/filter_test.cc b/cpp/src/gandiva/tests/filter_test.cc index f95cdcc..f63899a 100644 --- a/cpp/src/gandiva/tests/filter_test.cc +++ b/cpp/src/gandiva/tests/filter_test.cc @@ -290,4 +290,50 @@ TEST_F(TestFilter, TestSimpleSVInt32) { EXPECT_ARROW_ARRAY_EQUALS(exp, selection_vector->ToArray()); } +TEST_F(TestFilter, TestNullValidityBuffer) { + // schema for input fields + auto field0 = field("f0", int32()); + auto field1 = field("f1", int32()); + auto schema = arrow::schema({field0, field1}); + + // Build condition f0 + f1 < 10 + auto node_f0 = TreeExprBuilder::MakeField(field0); + auto node_f1 = TreeExprBuilder::MakeField(field1); + auto sum_func = + TreeExprBuilder::MakeFunction("add", {node_f0, node_f1}, arrow::int32()); + auto literal_10 = TreeExprBuilder::MakeLiteral((int32_t)10); + auto less_than_10 = TreeExprBuilder::MakeFunction("less_than", {sum_func, literal_10}, +arrow::boolean()); + auto condition = TreeExprBuilder::MakeCondition(less_than_10); + + std::shared_ptr filter; + Status status = Filter::Make(schema, condition, ); + EXPECT_TRUE(status.ok()); + + // Create a row-batch with some sample data + int num_records = 5; + + auto array_ = MakeArrowArrayInt32({1, 2, 3, 4, 6}, {true, true, true, false, true}); + // Create an array without a validity buffer. + auto array0 = + std::make_shared(5, array_->data()->buffers[1], nullptr, 0); + auto array1 = MakeArrowArrayInt32({5, 9, 6, 17, 3}, {true, true, false, true, true}); + // expected output (indices for which condition matches) + auto exp = MakeArrowArrayUint16({0, 4}); + + // prepare input record batch + auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1}); + + std::shared_ptr selection_vector; + status = SelectionVector::MakeInt16(num_records, pool_, _vector); + EXPECT_TRUE(st
[arrow] branch master updated: ARROW-3751: [Gandiva][Python] Add more cython bindings for gandiva
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 217c28a ARROW-3751: [Gandiva][Python] Add more cython bindings for gandiva 217c28a is described below commit 217c28a30f25d4f80089d35a49b7a1698b296a9b Author: suquark AuthorDate: Wed Nov 14 22:38:01 2018 -0800 ARROW-3751: [Gandiva][Python] Add more cython bindings for gandiva https://issues.apache.org/jira/projects/ARROW/issues/ARROW-3751 Author: suquark Author: Philipp Moritz Closes #2936 from suquark/gandiva-more and squashes the following commits: 3b7477334 small fix 19bbf21ae Fix cython bindings. 05dfd0619 lint e3790322c rename gandiva operations 547eba672 lint c04d4e5e6 fix test ee36a6221 lint 72b9dde5c more fix, more test 8be6339fa fix test 9efc58b8c lint 2e00cd2e2 Fix make_in_expression and add tests. 4577e88fb lint 0ea3a0ef1 Add test for make_or and make_and 9e5e8089b Implement make_in_expression 1e90fe114 Create 'make_add' & 'make_or' --- cpp/src/gandiva/tree_expr_builder.cc | 6 +- cpp/src/gandiva/tree_expr_builder.h| 26 +++-- python/pyarrow/gandiva.pyx | 192 - python/pyarrow/includes/libgandiva.pxd | 46 python/pyarrow/tests/test_gandiva.py | 138 5 files changed, 371 insertions(+), 37 deletions(-) diff --git a/cpp/src/gandiva/tree_expr_builder.cc b/cpp/src/gandiva/tree_expr_builder.cc index 91e97ab..86a2824 100644 --- a/cpp/src/gandiva/tree_expr_builder.cc +++ b/cpp/src/gandiva/tree_expr_builder.cc @@ -185,9 +185,11 @@ ConditionPtr TreeExprBuilder::MakeCondition(const std::string& function, MAKE_IN(Int32, int32_t); MAKE_IN(Int64, int64_t); -MAKE_IN(Date, int64_t); +MAKE_IN(Date32, int32_t); +MAKE_IN(Date64, int64_t); MAKE_IN(TimeStamp, int64_t); -MAKE_IN(Time, int32_t); +MAKE_IN(Time32, int32_t); +MAKE_IN(Time64, int64_t); MAKE_IN(String, std::string); MAKE_IN(Binary, std::string); diff --git a/cpp/src/gandiva/tree_expr_builder.h b/cpp/src/gandiva/tree_expr_builder.h index 5d4946e..cd261c8 100644 --- a/cpp/src/gandiva/tree_expr_builder.h +++ b/cpp/src/gandiva/tree_expr_builder.h @@ -90,18 +90,32 @@ class TreeExprBuilder { /// \brief creates an in expression static NodePtr MakeInExpressionInt32(NodePtr node, const std::unordered_set& constants); + static NodePtr MakeInExpressionInt64(NodePtr node, const std::unordered_set& constants); + static NodePtr MakeInExpressionString(NodePtr node, const std::unordered_set& constants); + static NodePtr MakeInExpressionBinary(NodePtr node, const std::unordered_set& constants); - /// \brief Date as millis since epoch. - static NodePtr MakeInExpressionDate(NodePtr node, - const std::unordered_set& constants); - /// \brief Time as millis of day - static NodePtr MakeInExpressionTime(NodePtr node, - const std::unordered_set& constants); + + /// \brief Date as s/millis since epoch. + static NodePtr MakeInExpressionDate32(NodePtr node, +const std::unordered_set& constants); + + /// \brief Date as millis/us/ns since epoch. + static NodePtr MakeInExpressionDate64(NodePtr node, +const std::unordered_set& constants); + + /// \brief Time as s/millis of day + static NodePtr MakeInExpressionTime32(NodePtr node, +const std::unordered_set& constants); + + /// \brief Time as millis/us/ns of day + static NodePtr MakeInExpressionTime64(NodePtr node, +const std::unordered_set& constants); + /// \brief Timestamp as millis since epoch. static NodePtr MakeInExpressionTimeStamp(NodePtr node, const std::unordered_set& constants); diff --git a/python/pyarrow/gandiva.pyx b/python/pyarrow/gandiva.pyx index 162517a..418d0d6 100644 --- a/python/pyarrow/gandiva.pyx +++ b/python/pyarrow/gandiva.pyx @@ -23,7 +23,8 @@ from libcpp cimport bool as c_bool, nullptr from libcpp.memory cimport shared_ptr, unique_ptr, make_shared from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector -from libc.stdint cimport int64_t, uint8_t, uintptr_t +from libcpp.unordered_set cimport unordered_set as c_unordered_set +from libc.stdint cimport int64_t, int32_t, uint8_t, uintptr_t from pyarrow.includes.libarrow cimport * from pyarrow.compat import frombytes @@ -32,34 +33,46
[arrow] branch master updated: ARROW-3746: [Gandiva] [Python] Print list of functions registered with gandiva
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 1ef6c26 ARROW-3746: [Gandiva] [Python] Print list of functions registered with gandiva 1ef6c26 is described below commit 1ef6c2644b654fa77c49cc20bb9d8fc66d3f0c4f Author: Philipp Moritz AuthorDate: Sun Nov 11 13:30:38 2018 -0800 ARROW-3746: [Gandiva] [Python] Print list of functions registered with gandiva I'm also making the iterators of the Function registry static, can you check if that's ok @praveenbingo and @pravindra Author: Philipp Moritz Closes #2933 from pcmoritz/gandiva-print-functions and squashes the following commits: 58fb14bfd linting 09fe76b2d documentation fix and cleanup 1bc904fe8 add test and simplify 3b7d57960 lint 947dd64b5 lint c02dc7c0b fix cd520f834 fix 4a23c50de update 079070552 python linting cf7fa35b0 fix lint eacceec55 add documentation 3a1b78cfc print list of functions registered with gandiva --- cpp/src/gandiva/expression_registry.cc | 11 ++ cpp/src/gandiva/expression_registry.h | 3 ++ python/pyarrow/gandiva.pyx | 64 -- python/pyarrow/includes/libgandiva.pxd | 21 +++ python/pyarrow/tests/test_gandiva.py | 10 ++ 5 files changed, 107 insertions(+), 2 deletions(-) diff --git a/cpp/src/gandiva/expression_registry.cc b/cpp/src/gandiva/expression_registry.cc index c17c5b3..fb5a45e 100644 --- a/cpp/src/gandiva/expression_registry.cc +++ b/cpp/src/gandiva/expression_registry.cc @@ -150,4 +150,15 @@ void ExpressionRegistry::AddArrowTypesToVector(arrow::Type::type& type, } } +std::vector> GetRegisteredFunctionSignatures() { + ExpressionRegistry registry; + std::vector> signatures; + for (auto iter = registry.function_signature_begin(); + iter != registry.function_signature_end(); iter++) { +signatures.push_back(std::make_shared( +(*iter).base_name(), (*iter).param_types(), (*iter).ret_type())); + } + return signatures; +} + } // namespace gandiva diff --git a/cpp/src/gandiva/expression_registry.h b/cpp/src/gandiva/expression_registry.h index fde0449..a03deab 100644 --- a/cpp/src/gandiva/expression_registry.h +++ b/cpp/src/gandiva/expression_registry.h @@ -61,5 +61,8 @@ class ExpressionRegistry { static void AddArrowTypesToVector(arrow::Type::type& type, DataTypeVector& vector); std::unique_ptr function_registry_; }; + +std::vector> GetRegisteredFunctionSignatures(); + } // namespace gandiva #endif // GANDIVA_TYPES_H diff --git a/python/pyarrow/gandiva.pyx b/python/pyarrow/gandiva.pyx index 84fc5fa..162517a 100644 --- a/python/pyarrow/gandiva.pyx +++ b/python/pyarrow/gandiva.pyx @@ -29,7 +29,8 @@ from pyarrow.includes.libarrow cimport * from pyarrow.compat import frombytes from pyarrow.types import _as_type from pyarrow.lib cimport (Array, DataType, Field, MemoryPool, RecordBatch, - Schema, check_status, pyarrow_wrap_array) + Schema, check_status, pyarrow_wrap_array, + pyarrow_wrap_data_type) from pyarrow.includes.libgandiva cimport (CCondition, CExpression, CNode, CProjector, CFilter, @@ -56,7 +57,9 @@ from pyarrow.includes.libgandiva cimport (CCondition, CExpression, SelectionVector_MakeInt32, SelectionVector_MakeInt64, Projector_Make, - Filter_Make) + Filter_Make, + CFunctionSignature, + GetRegisteredFunctionSignatures) cdef class Node: @@ -257,3 +260,60 @@ cpdef make_filter(Schema schema, Condition condition): cdef shared_ptr[CFilter] result check_status(Filter_Make(schema.sp_schema, condition.condition, )) return Filter.create(result) + +cdef class FunctionSignature: +""" +Signature of a Gandiva function including name, parameter types +and return type. +""" + +cdef: +shared_ptr[CFunctionSignature] signature + +def __init__(self): +raise TypeError("Do not call {}'s constructor directly." +.format(self.__class__.__name__)) + +@staticmethod +cdef create(shared_ptr[CFunctionSignature] signature): +cdef FunctionSignature self = FunctionSignature.__new__( +FunctionSignature) +self.signature = signature +return self + +def return_type(self): +return pyarrow_wrap_data_type(self.signature.get().ret_type()) + +def
[arrow] branch master updated: ARROW-3742: Fix pyarrow.types & gandiva cython bindings
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new d8d07dc ARROW-3742: Fix pyarrow.types & gandiva cython bindings d8d07dc is described below commit d8d07dcdbb6b51dc31c07f1e6b86d9e2fda4328f Author: suquark AuthorDate: Fri Nov 9 21:59:40 2018 -0800 ARROW-3742: Fix pyarrow.types & gandiva cython bindings https://issues.apache.org/jira/browse/ARROW-3742 Author: suquark Closes #2931 from suquark/gandiva-more and squashes the following commits: 91a4ac40e change parameter name 27d68d07c Fix bugs --- python/pyarrow/gandiva.pyx | 19 --- python/pyarrow/includes/libgandiva.pxd | 12 +++- python/pyarrow/types.py| 3 ++- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/gandiva.pyx b/python/pyarrow/gandiva.pyx index 7a6c09e..84fc5fa 100644 --- a/python/pyarrow/gandiva.pyx +++ b/python/pyarrow/gandiva.pyx @@ -52,7 +52,9 @@ from pyarrow.includes.libgandiva cimport (CCondition, CExpression, TreeExprBuilder_MakeField, TreeExprBuilder_MakeIf, TreeExprBuilder_MakeCondition, + SelectionVector_MakeInt16, SelectionVector_MakeInt32, + SelectionVector_MakeInt64, Projector_Make, Filter_Make) @@ -154,10 +156,21 @@ cdef class Filter: self.filter = filter return self -def evaluate(self, RecordBatch batch, MemoryPool pool): +def evaluate(self, RecordBatch batch, MemoryPool pool, dtype='int32'): cdef shared_ptr[CSelectionVector] selection -check_status(SelectionVector_MakeInt32( -batch.num_rows, pool.pool, )) +cdef DataType type = _as_type(dtype) +if type.id == _Type_INT16: +check_status(SelectionVector_MakeInt16( +batch.num_rows, pool.pool, )) +elif type.id == _Type_INT32: +check_status(SelectionVector_MakeInt32( +batch.num_rows, pool.pool, )) +elif type.id == _Type_INT64: +check_status(SelectionVector_MakeInt64( +batch.num_rows, pool.pool, )) +else: +raise ValueError("'dtype' of the selection vector should be " + "one of 'int16', 'int32' and 'int64'.") check_status(self.filter.get().Evaluate( batch.sp_batch.get()[0], selection)) return SelectionVector.create(selection) diff --git a/python/pyarrow/includes/libgandiva.pxd b/python/pyarrow/includes/libgandiva.pxd index f8106bc..a9f4a7e 100644 --- a/python/pyarrow/includes/libgandiva.pxd +++ b/python/pyarrow/includes/libgandiva.pxd @@ -39,9 +39,19 @@ cdef extern from "gandiva/selection_vector.h" namespace "gandiva" nogil: shared_ptr[CArray] ToArray() +cdef CStatus SelectionVector_MakeInt16\ +"gandiva::SelectionVector::MakeInt16"( +int64_t max_slots, CMemoryPool* pool, +shared_ptr[CSelectionVector]* selection_vector) + cdef CStatus SelectionVector_MakeInt32\ "gandiva::SelectionVector::MakeInt32"( -int max_slots, CMemoryPool* pool, +int64_t max_slots, CMemoryPool* pool, +shared_ptr[CSelectionVector]* selection_vector) + +cdef CStatus SelectionVector_MakeInt64\ +"gandiva::SelectionVector::MakeInt64"( +int64_t max_slots, CMemoryPool* pool, shared_ptr[CSelectionVector]* selection_vector) cdef extern from "gandiva/condition.h" namespace "gandiva" nogil: diff --git a/python/pyarrow/types.py b/python/pyarrow/types.py index 2bd7027..d07dcca 100644 --- a/python/pyarrow/types.py +++ b/python/pyarrow/types.py @@ -19,7 +19,8 @@ from pyarrow.lib import (is_boolean_value, # noqa is_integer_value, - is_float_value) + is_float_value, + _as_type) import pyarrow.lib as lib
[arrow] branch master updated: ARROW-3718: [Gandiva] Remove spurious gtest include
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new df4eb30 ARROW-3718: [Gandiva] Remove spurious gtest include df4eb30 is described below commit df4eb302f3bf03585c7ac2f7af07b145b5441d0f Author: Philipp Moritz AuthorDate: Wed Nov 7 22:32:49 2018 -0800 ARROW-3718: [Gandiva] Remove spurious gtest include Author: Philipp Moritz Closes #2917 from pcmoritz/gandiva-remove-gtest and squashes the following commits: 210362f29 remove gtest include from gandiva --- cpp/src/gandiva/expr_decomposer.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/gandiva/expr_decomposer.h b/cpp/src/gandiva/expr_decomposer.h index 28fa616..bc21ed0 100644 --- a/cpp/src/gandiva/expr_decomposer.h +++ b/cpp/src/gandiva/expr_decomposer.h @@ -23,7 +23,6 @@ #include #include -#include #include "gandiva/arrow.h" #include "gandiva/expression.h" #include "gandiva/node.h"
[arrow] branch master updated: ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc)
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new d290538 ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc) d290538 is described below commit d2905383f569310128e29fec47f81458d2824f14 Author: suquark AuthorDate: Wed Nov 7 21:55:40 2018 -0800 ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc) This PR enables efficient serialization for Arrow Objects (array, table, tensor, record batch). Author: suquark Closes #2832 from suquark/serialization and squashes the following commits: 5a2d2c628 Fix the outdated test. 4824c574f Add serialization hooks for pyarrow object --- cpp/src/arrow/python/deserialize.cc| 64 +--- cpp/src/arrow/python/deserialize.h | 5 ++- cpp/src/arrow/python/serialize.cc | 42 ++- cpp/src/arrow/python/serialize.h | 1 + python/pyarrow/includes/libarrow.pxd | 3 +- python/pyarrow/serialization.pxi | 4 +- python/pyarrow/serialization.py| 67 ++ python/pyarrow/tests/test_plasma_tf_op.py | 1 + python/pyarrow/tests/test_serialization.py | 37 - 9 files changed, 200 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index f44b3f5..91f7723 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -96,10 +96,9 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx, return Status::OK(); } -Status DeserializeArray(const Array& array, int64_t offset, PyObject* base, -const SerializedPyObject& blobs, PyObject** out) { - int32_t index = checked_cast(array).Value(offset); - RETURN_NOT_OK(py::TensorToNdarray(blobs.tensors[index], base, out)); +Status DeserializeArray(int32_t index, PyObject* base, const SerializedPyObject& blobs, +PyObject** out) { + RETURN_NOT_OK(py::TensorToNdarray(blobs.ndarrays[index], base, out)); // Mark the array as immutable OwnedRef flags(PyObject_GetAttrString(*out, "flags")); DCHECK(flags.obj() != NULL) << "Could not mark Numpy array immutable"; @@ -178,11 +177,16 @@ Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr, default: { const std::string& child_name = parent.type()->child(type)->name(); if (child_name == "tensor") { -return DeserializeArray(arr, index, base, blobs, result); +int32_t ref = checked_cast(arr).Value(index); +*result = wrap_tensor(blobs.tensors[ref]); +return Status::OK(); } else if (child_name == "buffer") { int32_t ref = checked_cast(arr).Value(index); *result = wrap_buffer(blobs.buffers[ref]); return Status::OK(); + } else if (child_name == "ndarray") { +int32_t ref = checked_cast(arr).Value(index); +return DeserializeArray(ref, base, blobs, result); } else { DCHECK(false) << "union tag " << type << " with child name '" << child_name << "' not recognized"; @@ -256,13 +260,19 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) { int64_t bytes_read; int32_t num_tensors; + int32_t num_ndarrays; int32_t num_buffers; + // Read number of tensors RETURN_NOT_OK( src->Read(sizeof(int32_t), _read, reinterpret_cast(_tensors))); RETURN_NOT_OK( + src->Read(sizeof(int32_t), _read, reinterpret_cast(_ndarrays))); + RETURN_NOT_OK( src->Read(sizeof(int32_t), _read, reinterpret_cast(_buffers))); + // Align stream to 8-byte offset + RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment)); std::shared_ptr reader; RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, )); RETURN_NOT_OK(reader->ReadNext(>batch)); @@ -280,6 +290,13 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) out->tensors.push_back(tensor); } + for (int i = 0; i < num_ndarrays; ++i) { +std::shared_ptr ndarray; +RETURN_NOT_OK(ipc::ReadTensor(src, )); +RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment)); +out->ndarrays.push_back(ndarray); + } + int64_t offset = -1; RETURN_NOT_OK(src->Tell()); for (int i = 0; i < num_buffers; ++i) { @@ -305,21 +322,14 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject&
[arrow] branch master updated: ARROW-3602: [Gandiva] [Python] Initial Gandiva Cython bindings
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 8e9cb87 ARROW-3602: [Gandiva] [Python] Initial Gandiva Cython bindings 8e9cb87 is described below commit 8e9cb870de0ecb126a0d7403f34e3a1ce119d618 Author: Philipp Moritz AuthorDate: Wed Nov 7 21:06:45 2018 -0800 ARROW-3602: [Gandiva] [Python] Initial Gandiva Cython bindings This is an initial Cython wrapper for Gandiva. Feedback is appreciated (the API is experimental right now and will most likely change in the future). Author: Philipp Moritz Closes #2822 from pcmoritz/gandiva-cython and squashes the following commits: 6ff0d9402 Merge branch 'master' into gandiva-cython 6ea00628b use cython instantiation 4a366bb42 Merge branch 'master' into gandiva-cython 73bc203fd use strings instead of bytes for function names f589de37d fix travis 69b10cb3d don't build gandiva in mac os build 20b75a6bb update e1d074bc5 build gandiva tests 562b763e4 put back llvm 9b9d3ab22 don't build gandiva tests in python build 1ee601933 don't run gandiva tests on python build 646f36014 Merge branch 'gandiva-cython' of github.com:pcmoritz/arrow into gandiva-cython 40bb0c701 use gandiva files cd282a348 Merge branch 'master' into gandiva-cython 829f7a2cf fix f726d1745 remove compiler error 99f93f167 Merge branch 'master' into gandiva-cython cfec265e7 use clang 600888443 install llvm 6.0 (?) 5abd24897 build gandiva 9ee2c5868 add gandiva flag 038084655 fix pytest include 27434d596 add ganvida pytest flags 8374cdb6f make gandiva optional for tests 5ceb22163 allow gandiva failure 021b301c0 lint 276536c8a Merge branch 'master' into gandiva-cython b41599496 remove gandiva cython bindings from wheels 7b75dec5e linting dc2a64870 add filter 0ff33c18d add test b4571eed2 build gandiva for wheels 92c8dff70 linting ae5305af1 linting 97568d90b whitespace 335be395d update FindGandiva.cmake 33786d260 memory pool handling 26be86632 fix array creation aab17705c fixes 7a8c9483e port gandiva cython wrappers to in-tree gandiva --- .travis.yml| 2 + ci/travis_script_python.sh | 8 ++ cpp/CMakeLists.txt | 4 + cpp/cmake_modules/FindGandiva.cmake| 96 cpp/src/gandiva/CMakeLists.txt | 52 + cpp/src/gandiva/tests/generate_data.h | 1 - python/CMakeLists.txt | 24 python/pyarrow/gandiva.pyx | 204 + python/pyarrow/includes/libgandiva.pxd | 107 + python/pyarrow/tests/conftest.py | 8 ++ python/pyarrow/tests/test_gandiva.py | 100 python/setup.py| 9 ++ 12 files changed, 590 insertions(+), 25 deletions(-) diff --git a/.travis.yml b/.travis.yml index f6f499c..20bf4ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -78,6 +78,7 @@ matrix: - ARROW_BUILD_WARNING_LEVEL=CHECKIN - ARROW_TRAVIS_PYTHON_JVM=1 - ARROW_TRAVIS_JAVA_BUILD_ONLY=1 +- ARROW_TRAVIS_PYTHON_GANDIVA=1 # ARROW-2999 Benchmarks are disabled in Travis CI for the time being # - ARROW_TRAVIS_PYTHON_BENCHMARKS=1 - MATRIX_EVAL="CC=gcc-4.9 && CXX=g++-4.9" @@ -85,6 +86,7 @@ matrix: # (ARROW_CI_CPP_AFFECTED implies ARROW_CI_PYTHON_AFFECTED) - if [ $ARROW_CI_PYTHON_AFFECTED != "1" ]; then exit; fi - $TRAVIS_BUILD_DIR/ci/travis_install_linux.sh +- $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh # If either C++ or Python changed, we must install the C++ libraries - git submodule update --init - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index 4d48adb..608e1ce 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -96,6 +96,10 @@ if [ $ARROW_TRAVIS_COVERAGE == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GENERATE_COVERAGE=ON" fi +if [ $ARROW_TRAVIS_PYTHON_GANDIVA == "1" ]; then + CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GANDIVA=ON -DARROW_GANDIVA_BUILD_TESTS=OFF" +fi + cmake -GNinja \ $CMAKE_COMMON_FLAGS \ -DARROW_BUILD_TESTS=on \ @@ -136,6 +140,9 @@ export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE export PYARROW_WITH_PARQUET=1 export PYARROW_WITH_PLASMA=1 export PYARROW_WITH_ORC=1 +if [ $ARROW_TRAVIS_PYTHON_GANDIVA == "1" ]; then + export PYARROW_WITH_GANDIVA=1 +fi python setup.py develop @@ -201,6 +208,7 @@ if [ "$ARROW_TRAVIS_PYTHON_BENCHMARKS" == "1" ] &&
[arrow] branch master updated: ARROW-3605: [Plasma] Remove dependence of plasma/events.h on ae.h.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new b4f7ed6 ARROW-3605: [Plasma] Remove dependence of plasma/events.h on ae.h. b4f7ed6 is described below commit b4f7ed6d6ed5cdb6dd136bac3181a438f35c8ea0 Author: Robert Nishihara AuthorDate: Wed Oct 24 23:20:55 2018 -0700 ARROW-3605: [Plasma] Remove dependence of plasma/events.h on ae.h. Author: Robert Nishihara Closes #2826 from robertnishihara/aeinclude and squashes the following commits: aa7177bfb Address comments. d6ba51f7a Remove dependence of plasma/events.h on ae.h. --- cpp/src/plasma/events.cc | 10 ++ cpp/src/plasma/events.h | 38 +- cpp/src/plasma/store.cc | 2 +- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc index 44637da..7c977b0 100644 --- a/cpp/src/plasma/events.cc +++ b/cpp/src/plasma/events.cc @@ -21,8 +21,18 @@ #include +extern "C" { +#include "ae/ae.h" +} + namespace plasma { +// Verify that the constants defined in events.h are defined correctly. +static_assert(kEventLoopTimerDone == AE_NOMORE, "constant defined incorrectly"); +static_assert(kEventLoopOk == AE_OK, "constant defined incorrectly"); +static_assert(kEventLoopRead == AE_READABLE, "constant defined incorrectly"); +static_assert(kEventLoopWrite == AE_WRITABLE, "constant defined incorrectly"); + void EventLoop::FileEventCallback(aeEventLoop* loop, int fd, void* context, int events) { FileCallback* callback = reinterpret_cast(context); (*callback)(events); diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h index 5b69743..109540e 100644 --- a/cpp/src/plasma/events.h +++ b/cpp/src/plasma/events.h @@ -22,20 +22,24 @@ #include #include -extern "C" { -#include "ae/ae.h" -} +struct aeEventLoop; namespace plasma { +// The constants below are defined using hardcoded values taken from ae.h so +// that ae.h does not need to be included in this file. + /// Constant specifying that the timer is done and it will be removed. -constexpr int kEventLoopTimerDone = AE_NOMORE; +constexpr int kEventLoopTimerDone = -1; // AE_NOMORE + +/// A successful status. +constexpr int kEventLoopOk = 0; // AE_OK /// Read event on the file descriptor. -constexpr int kEventLoopRead = AE_READABLE; +constexpr int kEventLoopRead = 1; // AE_READABLE /// Write event on the file descriptor. -constexpr int kEventLoopWrite = AE_WRITABLE; +constexpr int kEventLoopWrite = 2; // AE_WRITABLE typedef long long TimerID; // NOLINT @@ -57,29 +61,29 @@ class EventLoop { /// Add a new file event handler to the event loop. /// - /// @param fd The file descriptor we are listening to. - /// @param events The flags for events we are listening to (read or write). - /// @param callback The callback that will be called when the event happens. - /// @return Returns true if the event handler was added successfully. + /// \param fd The file descriptor we are listening to. + /// \param events The flags for events we are listening to (read or write). + /// \param callback The callback that will be called when the event happens. + /// \return Returns true if the event handler was added successfully. bool AddFileEvent(int fd, int events, const FileCallback& callback); /// Remove a file event handler from the event loop. /// - /// @param fd The file descriptor of the event handler. + /// \param fd The file descriptor of the event handler. void RemoveFileEvent(int fd); /// Register a handler that will be called after a time slice of - /// "timeout" milliseconds. + /// "timeout" milliseconds. /// - /// @param timeout The timeout in milliseconds. - /// @param callback The callback for the timeout. - /// @return The ID of the newly created timer. + /// \param timeout The timeout in milliseconds. + /// \param callback The callback for the timeout. + /// \return The ID of the newly created timer. int64_t AddTimer(int64_t timeout, const TimerCallback& callback); /// Remove a timer handler from the event loop. /// - /// @param timer_id The ID of the timer that is to be removed. - /// @return The ae.c error code. TODO(pcm): needs to be standardized + /// \param timer_id The ID of the timer that is to be removed. + /// \return The ae.c error code. TODO(pcm): needs to be standardized int RemoveTimer(int64_t timer_id); /// \brief Run the event loop. diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 54792e9..28624d0 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -280,7 +280,7 @@ void PlasmaStore::RemoveGetRequest(GetRequest*
[arrow] branch master updated: ARROW-3574: [Plasma] Use static libraries in plasma library.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 2d0d3d0 ARROW-3574: [Plasma] Use static libraries in plasma library. 2d0d3d0 is described below commit 2d0d3d0dc51999fbaafb15d8b8362a1ef3de2ef7 Author: Robert Nishihara AuthorDate: Mon Oct 22 17:46:06 2018 -0700 ARROW-3574: [Plasma] Use static libraries in plasma library. I missed one location in https://github.com/apache/arrow/pull/2792. That previous PR fixed the issue on Linux but not on Mac. Without this PR, moving the `plasma_store_server` executable and then executing it leads to ``` dyld: Library not loaded: @rpath/libarrow.12.dylib Referenced from: /Users/rkn/Workspace/ray/./python/ray/core/src/plasma/plasma_store_server Reason: image not found Abort trap: 6 ``` Author: Robert Nishihara Closes #2804 from robertnishihara/staticupdate and squashes the following commits: ed3eca256 Fix 8fc4dfdfb Fix b872cce6d Use static libraries in plasma library. --- cpp/src/plasma/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index d697591..0928249 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -86,7 +86,7 @@ set(PLASMA_STATIC_LINK_LIBS arrow_static) if (ARROW_GPU) set(PLASMA_LINK_LIBS ${PLASMA_LINK_LIBS} arrow_gpu_shared) - set(PLASMA_STATIC_LINK_LIBS ${PLASMA_STATIC_LINK_LIBS} arrow_static) + set(PLASMA_STATIC_LINK_LIBS ${PLASMA_STATIC_LINK_LIBS} arrow_gpu_static) add_definitions(-DPLASMA_GPU) endif() @@ -95,7 +95,7 @@ ADD_ARROW_LIB(plasma OUTPUTS PLASMA_LIBRARIES DEPENDENCIES gen_plasma_fbs SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} ${PLASMA_LINK_LIBS} - STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} ${PLASMA_LINK_LIBS}) + STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} ${PLASMA_STATIC_LINK_LIBS}) foreach(LIB_TARGET ${PLASMA_LIBRARIES}) target_compile_definitions(${LIB_TARGET}
[arrow] branch master updated: ARROW-3558: [Plasma] Remove fatal error when calling get on unsealed object.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 0ecba4f ARROW-3558: [Plasma] Remove fatal error when calling get on unsealed object. 0ecba4f is described below commit 0ecba4f0479c159be86820096a6acac35088441b Author: Robert Nishihara AuthorDate: Fri Oct 19 13:04:04 2018 -0700 ARROW-3558: [Plasma] Remove fatal error when calling get on unsealed object. In addition to removing the fatal error in the case of a timeout, this PR also removes the deprecated version of `Get`. Author: Robert Nishihara Closes #2791 from robertnishihara/changeplasmaerror and squashes the following commits: 2e60a2b3d Fix 7f7a8b54e Undo removal of old API. 8770aefe7 Fix usage of get in plasma_op. 0ff8602dd Update java code. d11da6c9b Remove fatal error when calling get on unsealed object. --- cpp/src/plasma/client.cc| 13 + cpp/src/plasma/client.h | 10 +- python/pyarrow/tests/test_plasma.py | 10 ++ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 7e2ce0f..d37b033 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -475,11 +475,16 @@ Status PlasmaClient::Impl::GetBuffers( // This object is not currently in use by this client, so we need to send // a request to the store. all_present = false; -} else { - // NOTE: If the object is still unsealed, we will deadlock, since we must - // have been the one who created it. - ARROW_CHECK(object_entry->second->is_sealed) +} else if (!object_entry->second->is_sealed) { + // This client created the object but hasn't sealed it. If we call Get + // with no timeout, we will deadlock, because this client won't be able to + // call Seal. + ARROW_CHECK(timeout_ms != -1) << "Plasma client called get on an unsealed object that it created"; + ARROW_LOG(WARNING) + << "Attempting to get an object that this client created but hasn't sealed."; + all_present = false; +} else { PlasmaObject* object = _entry->second->object; std::shared_ptr physical_buf; diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 59b001c..1ad09f5 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -110,16 +110,16 @@ class ARROW_EXPORT PlasmaClient { /// objects have all been created and sealed in the Plasma Store or the /// timeout expires. /// + /// If an object was not retrieved, the corresponding metadata and data + /// fields in the ObjectBuffer structure will evaluate to false. + /// Objects are automatically released by the client when their buffers + /// get out of scope. + /// /// \param object_ids The IDs of the objects to get. /// \param timeout_ms The amount of time in milliseconds to wait before this ///request times out. If this value is -1, then no timeout is set. /// \param[out] object_buffers The object results. /// \return The return status. - /// - /// If an object was not retrieved, the corresponding metadata and data - /// fields in the ObjectBuffer structure will evaluate to false. - /// Objects are automatically released by the client when their buffers - /// get out of scope. Status Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* object_buffers); diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 9229479..25a8aac 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -280,6 +280,16 @@ class TestPlasmaClient(object): else: assert results[i] is None +# Test trying to get an object that was created by the same client but +# not sealed. +object_id = random_object_id() +self.plasma_client.create(object_id, 10, b"metadata") +assert self.plasma_client.get_buffer(object_id, timeout_ms=0) is None +assert self.plasma_client.get_buffer(object_id, timeout_ms=1) is None +self.plasma_client.seal(object_id) +assert (self.plasma_client.get_buffer(object_id, timeout_ms=0) is +not None) + def test_buffer_lifetime(self): # ARROW-2195 arr = pa.array([1, 12, 23, 3, 34], pa.int32())
[arrow] branch master updated: ARROW-3548: [Plasma] Add CreateAndSeal object store method for faster puts for small objects.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new a8bd1c9 ARROW-3548: [Plasma] Add CreateAndSeal object store method for faster puts for small objects. a8bd1c9 is described below commit a8bd1c9ce4273d53eee2c67074b86410d1db4ce1 Author: Robert Nishihara AuthorDate: Thu Oct 18 18:15:32 2018 -0700 ARROW-3548: [Plasma] Add CreateAndSeal object store method for faster puts for small objects. To benchmark, start a store with `plasma_store_server -s /tmp/store -m 10` Then do ```python import pyarrow.plasma as plasma client = plasma.connect('/tmp/store', '', 0) ``` ```python def before(): object_id = plasma.ObjectID.from_random() client.create(object_id, 20, b'') client.seal(object_id) def after(): object_id = plasma.ObjectID.from_random() client.create_and_seal(object_id, 20 * b'a', b'') ``` ``` %timeit before() 63.4 µs ± 631 ns per loop (mean ± std. dev. of 7 runs, 1 loops each) %timeit after() 30.5 µs ± 669 ns per loop (mean ± std. dev. of 7 runs, 1 loops each) ``` There's actually more that could be done in the future in order to not have to wait for the return IPC if the client "reserves" a big chunk of memory up front. Author: Robert Nishihara Closes #2783 from robertnishihara/inlinesmallobjects and squashes the following commits: 98bb37b53 Improve test. a76e5052a Add comment explaining device_num = 0. 62fea3785 Linting. ca92be24a Fix linting. 711d5049f Add CreateAndSeal method for putting small objects in the object store more quickly. --- cpp/src/plasma/client.cc| 65 + cpp/src/plasma/client.h | 11 +++ cpp/src/plasma/format/plasma.fbs| 18 ++ cpp/src/plasma/protocol.cc | 39 ++ cpp/src/plasma/protocol.h | 13 cpp/src/plasma/store.cc | 29 + python/pyarrow/_plasma.pyx | 32 ++ python/pyarrow/tests/test_plasma.py | 32 ++ 8 files changed, 226 insertions(+), 13 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d88e8b1..7e2ce0f 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -174,6 +174,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this* data, int device_num = 0); + Status CreateAndSeal(const ObjectID& object_id, const std::string& data, + const std::string& metadata); + Status Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* object_buffers); @@ -245,6 +248,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this(data.data()), data.size(), + reinterpret_cast(metadata.data()), metadata.size(), device_num); + memcpy([0], , sizeof(hash)); + + RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, metadata, digest)); + std::vector buffer; + RETURN_NOT_OK( + PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, )); + RETURN_NOT_OK(ReadCreateAndSealReply(buffer.data(), buffer.size())); + return Status::OK(); +} + Status PlasmaClient::Impl::GetBuffers( const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, const std::function( @@ -756,26 +786,30 @@ bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state, } uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) { - DCHECK(obj_buffer.metadata); - DCHECK(obj_buffer.data); + return ComputeObjectHash(obj_buffer.data->data(), obj_buffer.data->size(), + obj_buffer.metadata->data(), obj_buffer.metadata->size(), + obj_buffer.device_num); +} + +uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data_size, + const uint8_t* metadata, + int64_t metadata_size, int device_num) { + DCHECK(metadata); + DCHECK(data); XXH64_state_t hash_state; - if (obj_buffer.device_num != 0) { + if (device_num != 0) { // TODO(wap): Create cuda program to hash data on gpu. return 0; } XXH64_reset(_state, XXH64_DEFAULT_SEED); - if (obj_buffer.data->size() >= kBytesInMB) { -ComputeObjectHashParallel( -_state, reinterpret_cast(obj_buffer.data->data()), -obj_buffer.data->size()); + if (data_size >= kBytesInMB) { +ComputeObjectHashParallel(_state, reinterpret_cast(data), + data_size); } else { -XXH64_update(_
[arrow] branch master updated: ARROW-3127: [Doc] Add Tutorial for Sending Tensor from C++ to Python
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new f3247e8 ARROW-3127: [Doc] Add Tutorial for Sending Tensor from C++ to Python f3247e8 is described below commit f3247e8c8b6f3959b4c44a33ab98388b370f0daa Author: Philipp Moritz AuthorDate: Fri Sep 7 16:17:11 2018 -0700 ARROW-3127: [Doc] Add Tutorial for Sending Tensor from C++ to Python This PR adds a short tutorial showing how to 1. Serialize a floating-point array in C++ into Tensor 2. Save the Tensor to Plasma 3. Access the Tensor in Python cc @pcmoritz Author: Philipp Moritz Author: Simon Mo Closes #2481 from simon-mo/arrow_tensor_doc and squashes the following commits: 73cb8fe7 some small fixes 5ad5f3dc Add Initial Draft of the Tutorial --- cpp/apidoc/index.md | 1 + cpp/apidoc/tutorials/tensor_to_py.md | 130 +++ 2 files changed, 131 insertions(+) diff --git a/cpp/apidoc/index.md b/cpp/apidoc/index.md index 25be1f2..46ee500 100644 --- a/cpp/apidoc/index.md +++ b/cpp/apidoc/index.md @@ -40,6 +40,7 @@ Table of Contents * Tutorials * [Convert a vector of row-wise data into an Arrow table](tutorials/row_wise_conversion.md) * [Using the Plasma In-Memory Object Store](tutorials/plasma.md) + * [Use Plasma to Access Tensors from C++ in Python](tutorials/tensor_to_py.md) Getting Started --- diff --git a/cpp/apidoc/tutorials/tensor_to_py.md b/cpp/apidoc/tutorials/tensor_to_py.md new file mode 100644 index 000..e7a7416 --- /dev/null +++ b/cpp/apidoc/tutorials/tensor_to_py.md @@ -0,0 +1,130 @@ + + +Use Plasma to Access Tensors from C++ in Python +== + +This short tutorial shows how to use Arrow and the Plasma Store to send data +from C++ to Python. + +In detail, we will show how to: +1. Serialize a floating-point array in C++ into an Arrow tensor +2. Save the Arrow tensor to Plasma +3. Access the Tensor in a Python process + +This approach has the advantage that multiple python processes can all read +the tensor with zero-copy. Therefore, only one copy is necessary when we send +a tensor from one C++ process to many python processes. + + +Step 0: Set up +-- +We will include the following header files and construct a Plasma client. + +```cpp +#include +#include +#include +#include +#include +#include + +PlasmaClient client_; +ARROW_CHECK_OK(client_.Connect("/tmp/plasma", "", 0)); +``` + + +Step 1: Serialize a floating point array in C++ into an Arrow Tensor + +In this step, we will construct a floating-point array in C++. + +```cpp +// Generate an Object ID for Plasma +ObjectID object_id = ObjectID::from_binary(""); + +// Generate Float Array +int64_t input_length = 1000; +std::vector input(input_length); +for (int64_t i = 0; i < input_length; ++i) { + input[i] = 2.0; +} + +// Cast float array to bytes array +const uint8_t* bytes_array = reinterpret_cast(input.data()); + +// Create Arrow Tensor Object, no copy made! +// {input_length} is the shape of the tensor +auto value_buffer = std::make_shared(bytes_array, sizeof(float) * input_length); +Tensor t(float32(), value_buffer, {input_length}); +``` + +Step 2: Save the Arrow Tensor to Plasma In-Memory Object Store +-- +Continuing from Step 1, this step will save the tensor to Plasma Store. We +use `arrow::ipc::WriteTensor` to write the data. + +The variable `meta_len` will contain the length of the tensor metadata +after the call to `arrow::ipc::WriteTensor`. + +```cpp +// Get the size of the tensor to be stored in Plasma +int64_t datasize; +ARROW_CHECK_OK(ipc::GetTensorSize(t, )); +int32_t meta_len = 0; + +// Create the Plasma Object +// Plasma is responsible for initializing and resizing the buffer +// This buffer will contain the _serialized_ tensor +std::shared_ptr buffer; +ARROW_CHECK_OK( +client_.Create(object_id, datasize, NULL, 0, )); + +// Writing Process, this will copy the tensor into Plasma +io::FixedSizeBufferWriter stream(buffer); +ARROW_CHECK_OK(arrow::ipc::WriteTensor(t, , _len, )); + +// Seal Plasma Object +// This computes a hash of the object data by default +ARROW_CHECK_OK(client_.Seal(object_id)); +``` + +Step 3: Access the Tensor in a Python Process +- +In Python, we will construct a Plasma client and point it to the store's socket. +The `inputs` variable will be a list of Object IDs in their raw byte string form. + +```python +import pyarrow as pa +import pyarrow.plasma as plasma + +plasma_client = plasma.connect('/tmp/plasma', '', 0) + +# inputs: a
[arrow] branch master updated: ARROW-3116: [Plasma] Add "ls" to object store
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 628b74b ARROW-3116: [Plasma] Add "ls" to object store 628b74b is described below commit 628b74b0de0a3d7ea6d7f424f605922fb8637b46 Author: Philipp Moritz AuthorDate: Sun Aug 26 13:24:03 2018 -0700 ARROW-3116: [Plasma] Add "ls" to object store This adds plasma_client.list to the plasma client API. It can be used like so: ```python import pyarrow.plasma as plasma import time client = plasma.connect("/tmp/plasma", "", 0) client.put("hello, world") # Sleep a little so we get different creation times time.sleep(2) client.put("another object") # Create an object that is not sealed yet object_id = plasma.ObjectID.from_random() client.create(object_id, 100) print(client.list()) >>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0, >>> 'create_time': 1535223642, >>> 'data_size': 460, >>> 'metadata_size': 0, >>> 'ref_count': 0, >>> 'state': 'sealed'}, >>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0, >>> 'create_time': 1535223644, >>> 'data_size': 460, >>> 'metadata_size': 0, >>> 'ref_count': 0, >>> 'state': 'sealed'}, >>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1, >>> 'create_time': 1535223644, >>> 'data_size': 100, >>> 'metadata_size': 0, >>> 'ref_count': 1, >>> 'state': 'created'}} ``` Author: Philipp Moritz Closes #2470 from pcmoritz/plasma-list and squashes the following commits: 5ff4e355 fix 32c36a75 minor fix a58db5bc add more documentation 1f0c91de fix 1f384ee9 add documentation 0958e4d1 merge ce122957 add test a6ba6f6f fix 62772a87 add timestamp bb8d52f0 fix 6572943d fix d343d4fb linting 96040ad4 cleanups 6f29cec0 fix test 79a2b549 make variables unique 93a9ca53 fix linting f7f36068 add test cases and fixes 189928be add more fields deb06b42 get list working ac1fae62 update a07db83b add list command to store 400d326f create object list --- cpp/src/plasma/client.cc| 11 ++ cpp/src/plasma/client.h | 15 +++ cpp/src/plasma/common.h | 52 + cpp/src/plasma/format/common.fbs| 5 ++- cpp/src/plasma/format/plasma.fbs| 12 ++ cpp/src/plasma/plasma.h | 43 +--- cpp/src/plasma/protocol.cc | 49 +++ cpp/src/plasma/protocol.h | 10 + cpp/src/plasma/store.cc | 9 + python/doc/source/plasma.rst| 43 python/pyarrow/_plasma.pyx | 78 + python/pyarrow/tests/test_plasma.py | 50 12 files changed, 334 insertions(+), 43 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 3c30f3e..c5f372f 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -184,6 +184,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this buffer; + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, )); + return ReadListReply(buffer.data(), buffer.size(), objects); +} + static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) { XXH64_state_t hash_state; XXH64_reset(_state, XXH64_DEFAULT_SEED); @@ -1057,6 +1066,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { return impl_->Contains(object_id, has_object); } +Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); } + Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); } Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); } diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index fe00193..a95b992 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -151,6 +151,21 @@ class ARROW_EXPORT PlasmaClient { /// \return The return status. Status Contains(const ObjectID& object_id, bool* has_object); + /// List all the objects in the object store. + /// + /// This API is experimental and might change in the future. + /// + /// \param[out] objects ObjectTable of objects in the store. For each
[arrow] branch master updated: ARROW-3105: [Plasma] Improve flushing error message
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new fda4b3d ARROW-3105: [Plasma] Improve flushing error message fda4b3d is described below commit fda4b3dcfc773612b12973df5053193f236fc696 Author: Robert Nishihara AuthorDate: Thu Aug 23 10:44:29 2018 -0700 ARROW-3105: [Plasma] Improve flushing error message Author: Robert Nishihara Author: Philipp Moritz Closes #2458 from pcmoritz/plasma-flushing-message and squashes the following commits: f2d08296 Update eviction_policy.cc b3e19883 improve flushing error message --- cpp/src/plasma/eviction_policy.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index cace588..e5beb5a 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -84,7 +84,8 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_t int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict); ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " << objects_to_evict->size() << " objects to free up " - << num_bytes_evicted << " bytes."; + << num_bytes_evicted << " bytes. The number of bytes in use (before " + << "this eviction) is " << memory_used_ << "."; return num_bytes_evicted >= required_space && num_bytes_evicted > 0; }
[arrow] branch master updated: ARROW-2864: [Plasma] Add deletion cache to delete objects later when they are not in use.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 4bed3bc ARROW-2864: [Plasma] Add deletion cache to delete objects later when they are not in use. 4bed3bc is described below commit 4bed3bc34f1d274a0ec1e49ca9d04f78e1612795 Author: Yuhong Guo AuthorDate: Tue Jul 24 15:58:47 2018 -0700 ARROW-2864: [Plasma] Add deletion cache to delete objects later when they are not in use. 1. When we send the request of deleting objects, some objects may be in use. We will put these objects into a cache. 2. Delete call will flush the release history, so after this Delete call, there should not be release history of the to-be-deleted objects in the history cache. 3. When Release is called, we will first handle the objects in the deletion cache without waiting. (The rest objects will waiting until the handling condition is triggered.) Author: Yuhong Guo Author: Philipp Moritz Closes #2273 from guoyuhong/deleteObjectInUse and squashes the following commits: 10ee293f Update client_tests.cc af55bae8 Fix Lint for _plasma.pyx 59048e8d Fix building failure while integrating with ray e000a800 Add deletion cache in store server. 939c45bc Change comment 34a630a5 Add deletion cache to delete objects when they are not in use later. --- cpp/src/plasma/client.cc| 16 ++ cpp/src/plasma/store.cc | 19 + cpp/src/plasma/store.h | 2 ++ cpp/src/plasma/test/client_tests.cc | 42 ++--- python/pyarrow/_plasma.pyx | 17 +++ 5 files changed, 85 insertions(+), 11 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 2d977ec..3c30f3e 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -40,6 +40,7 @@ #include #include #include +#include #include #include "arrow/buffer.h" @@ -269,6 +270,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; #ifdef PLASMA_GPU /// Cuda Device Manager. @@ -630,11 +633,22 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) { // Tell the store that the client no longer needs the object. RETURN_NOT_OK(UnmapObject(object_id)); RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id)); +auto iter = deletion_cache_.find(object_id); +if (iter != deletion_cache_.end()) { + deletion_cache_.erase(object_id); + RETURN_NOT_OK(Delete({object_id})); +} } return Status::OK(); } Status PlasmaClient::Impl::Release(const ObjectID& object_id) { + // If an object is in the deletion cache, handle it directly without waiting. + auto iter = deletion_cache_.find(object_id); + if (iter != deletion_cache_.end()) { +RETURN_NOT_OK(PerformRelease(object_id)); +return Status::OK(); + } // If the client is already disconnected, ignore release requests. if (store_conn_ < 0) { return Status::OK(); @@ -820,6 +834,8 @@ Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { // If the object is in used, skip it. if (objects_in_use_.count(object_id) == 0) { not_in_use_ids.push_back(object_id); +} else { + deletion_cache_.emplace(object_id); } } if (not_in_use_ids.size() > 0) { diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 58ea7a4..f55f3c9 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -400,10 +400,17 @@ int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* clie // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. if (entry->ref_count == 0) { - // Tell the eviction policy that this object is no longer being used. - std::vector objects_to_evict; - eviction_policy_.EndObjectAccess(entry->object_id, _to_evict); - DeleteObjects(objects_to_evict); + if (deletion_cache_.count(entry->object_id) == 0) { +// Tell the eviction policy that this object is no longer being used. +std::vector objects_to_evict; +eviction_policy_.EndObjectAccess(entry->object_id, _to_evict); +DeleteObjects(objects_to_evict); + } else { +// Above code does not really delete an object. Instead, it just put an +// object to LRU cache which will be cleaned when the memory is not enough. +deletion_cache_.erase(entry->object_id); +DeleteObjects({entry->object_id}); + } } // Return 1 to indicate that the client was removed. return 1; @@ -474,11 +481,15 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { if (entry->state != Obje
[arrow] branch master updated: ARROW-2872: [Python] Add tensorflow mark to opt-in to TF-related unit tests
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 4ba8769 ARROW-2872: [Python] Add tensorflow mark to opt-in to TF-related unit tests 4ba8769 is described below commit 4ba8769b4858dcd46a7ea7e40bd6c10102327a0d Author: Wes McKinney AuthorDate: Tue Jul 17 16:16:45 2018 -0700 ARROW-2872: [Python] Add tensorflow mark to opt-in to TF-related unit tests Author: Wes McKinney Closes #2279 from wesm/ARROW-2872 and squashes the following commits: 61394f0d Add tensorflow mark to opt-in to tensorflow unit test --- ci/travis_script_python.sh| 7 +-- python/pyarrow/tests/conftest.py | 4 +++- python/pyarrow/tests/test_plasma_tf_op.py | 7 ++- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index 453dfbd..4513fcd 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -27,6 +27,8 @@ export PARQUET_HOME=$ARROW_PYTHON_PARQUET_HOME export LD_LIBRARY_PATH=$ARROW_HOME/lib:$PARQUET_HOME/lib:$LD_LIBRARY_PATH export PYARROW_CXXFLAGS="-Werror" +PYARROW_PYTEST_FLAGS=" -r sxX --durations=15 --parquet" + PYTHON_VERSION=$1 CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION @@ -59,6 +61,7 @@ conda install -y -q pip \ if [ $TRAVIS_OS_NAME != "osx" ]; then conda install -y -c conda-forge tensorflow + PYARROW_PYTEST_FLAGS="$PYARROW_PYTEST_FLAGS --tensorflow" fi # Re-build C++ libraries with the right Python setup @@ -138,9 +141,9 @@ fi if [ "$ARROW_TRAVIS_COVERAGE" == "1" ]; then # Output Python coverage data in a persistent place export COVERAGE_FILE=$ARROW_PYTHON_COVERAGE_FILE -coverage run --append -m pytest -r sxX --durations=15 --parquet pyarrow/tests +coverage run --append -m pytest $PYARROW_PYTEST_FLAGS pyarrow/tests else -python -m pytest -r sxX --durations=15 --parquet pyarrow/tests +python -m pytest $PYARROW_PYTEST_FLAGS pyarrow/tests fi if [ "$ARROW_TRAVIS_COVERAGE" == "1" ]; then diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index e276822..b0eff1e 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -20,10 +20,11 @@ from pytest import skip, mark groups = [ 'hdfs', +'large_memory', 'parquet', 'plasma', -'large_memory', 's3', +'tensorflow' ] @@ -34,6 +35,7 @@ defaults = { 'plasma': False, 'large_memory': False, 's3': False, +'tensorflow': False } try: diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py index 3b41f00..39f8548 100644 --- a/python/pyarrow/tests/test_plasma_tf_op.py +++ b/python/pyarrow/tests/test_plasma_tf_op.py @@ -83,19 +83,16 @@ def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name, @pytest.mark.plasma +@pytest.mark.tensorflow def test_plasma_tf_op(use_gpu=False): import pyarrow.plasma as plasma +import tensorflow as tf plasma.build_plasma_tensorflow_op() if plasma.tf_plasma_op is None: pytest.skip("TensorFlow Op not found") -try: -import tensorflow as tf -except ImportError: -pytest.skip("TensorFlow not installed") - with plasma.start_plasma_store(10**8) as (plasma_store_name, p): client = plasma.connect(plasma_store_name, "", 0) for dtype in [np.float32, np.float64,
[arrow] branch master updated: ARROW-2690: [Plasma] Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 5063b33 ARROW-2690: [Plasma] Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers 5063b33 is described below commit 5063b333cf130d51a79f229d6b987c192773ab07 Author: Wes McKinney AuthorDate: Tue Jul 17 15:44:18 2018 -0700 ARROW-2690: [Plasma] Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers I made a pass over Plasma to make the function names more uniform in the style; let me know your comments. It was a little bit painful. I also wanted to be more explicit about what parts of the Flatbuffers protocol are leaking into the public API. Still `flatbuffers/flatbuffers.h` is leaking, but this is a step in containing things a bit. Author: Wes McKinney Closes #2242 from wesm/ARROW-2690 and squashes the following commits: 6d694ca0 Rename some more internal functions fc66f533 Use more distinguishable namespace alias than flatbuf 1bb90c32 Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers --- cpp/src/plasma/client.cc | 67 cpp/src/plasma/common.cc | 16 +- cpp/src/plasma/common.h| 14 +- cpp/src/plasma/eviction_policy.cc | 41 +++-- cpp/src/plasma/eviction_policy.h | 26 ++-- cpp/src/plasma/format/common.fbs | 2 + cpp/src/plasma/format/plasma.fbs | 1 + cpp/src/plasma/io.cc | 12 +- cpp/src/plasma/io.h| 16 +- cpp/src/plasma/malloc.cc | 6 +- cpp/src/plasma/malloc.h| 6 +- cpp/src/plasma/plasma.cc | 12 +- cpp/src/plasma/plasma.h| 16 +- cpp/src/plasma/protocol.cc | 220 +- cpp/src/plasma/protocol.h | 5 +- cpp/src/plasma/store.cc| 240 +++-- cpp/src/plasma/store.h | 49 +++--- cpp/src/plasma/test/serialization_tests.cc | 12 +- 18 files changed, 403 insertions(+), 358 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 95da089..f2b0b97 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -66,8 +66,13 @@ using arrow::gpu::CudaDeviceManager; #define XXH64_DEFAULT_SEED 0 +namespace fb = plasma::flatbuf; + namespace plasma { +using fb::MessageType; +using fb::PlasmaError; + using arrow::MutableBuffer; typedef struct XXH64_state_s XXH64_state_t; @@ -225,17 +230,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this&)>& wrap_buffer, ObjectBuffer* object_buffers); - uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size); + uint8_t* LookupOrMmap(int fd, int store_fd_val, int64_t map_size); - uint8_t* lookup_mmapped_file(int store_fd_val); + uint8_t* LookupMmappedFile(int store_fd_val); - void increment_object_count(const ObjectID& object_id, PlasmaObject* object, - bool is_sealed); + void IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object, +bool is_sealed); - bool compute_object_hash_parallel(XXH64_state_t* hash_state, const unsigned char* data, -int64_t nbytes); + bool ComputeObjectHashParallel(XXH64_state_t* hash_state, const unsigned char* data, + int64_t nbytes); - uint64_t compute_object_hash(const ObjectBuffer& obj_buffer); + uint64_t ComputeObjectHash(const ObjectBuffer& obj_buffer); /// File descriptor of the Unix domain socket that connects to the store. int store_conn_; @@ -284,7 +289,7 @@ PlasmaClient::Impl::~Impl() {} // If the file descriptor fd has been mmapped in this client process before, // return the pointer that was returned by mmap, otherwise mmap it and store the // pointer in a hash table. -uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) { +uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) { auto entry = mmap_table_.find(store_fd_val); if (entry != mmap_table_.end()) { close(fd); @@ -310,7 +315,7 @@ uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t ma // Get a pointer to a file that we know has been memory mapped in this client // process before. -uint8_t* PlasmaClient::Impl::lookup_mmapped_file(int store_fd_val) { +uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { auto entry = mmap_table_.find(store_fd_val); ARROW_CHECK(entry != mmap_table_.end()); r
[arrow] branch master updated: ARROW-2790: [C++] Buffers can contain uninitialized memory
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 1a419fe ARROW-2790: [C++] Buffers can contain uninitialized memory 1a419fe is described below commit 1a419fe8c3deabc0f16bb1089ed523b9ce57ffa7 Author: Dimitri Vorona AuthorDate: Mon Jul 9 00:46:45 2018 -0700 ARROW-2790: [C++] Buffers can contain uninitialized memory Author: Dimitri Vorona Author: Wes McKinney Author: Philipp Moritz Closes #2216 from alendit/fix-uninitialized-reads and squashes the following commits: 7280074f call Finish on builders in SequenceBuilder c2452f0b Add comment about mysterious non-determinism 168631c6 Fix Python unit test, though not sure why yet aeac2284 Do not zero null bitmaps twice e465ab02 More aggressively trim and zero padding in buffers, reduce code duplication efa8e853 Clean up code duplication in array-test a little bit 00e03488 test serialization for determinism af87b732 Fixes for revision 437580f6 Format tests 0a1a82ef Fix uninitialized data warnings 2f0b2820 Add more uninitialized data and padding tests 21be782d Remove valgrind suppression since it isn't needed anymore 4ff873f8 Add checks for data_ before zeroing padding 0d5d07fd Stricter int to bool conversion 7ed1b246 Consume statuses 9a9d5ddb Formatting 797e568b Add memory initialization on null and zero padding to buffers b8f3b27e Add zero-padding tests 4fb8ace8 Add tests for uninitialized values when adding nulls --- cpp/src/arrow/array-test.cc| 249 + cpp/src/arrow/buffer.h | 7 +- cpp/src/arrow/builder.cc | 63 +--- cpp/src/arrow/builder.h| 19 +++ cpp/src/arrow/python/python_to_arrow.cc| 12 +- cpp/src/arrow/test-util.h | 2 + cpp/src/arrow/util/bit-util.cc | 2 +- cpp/valgrind.supp | 8 +- python/pyarrow/tests/test_serialization.py | 7 + 9 files changed, 298 insertions(+), 71 deletions(-) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index cd32976..1bef773 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -41,6 +41,42 @@ namespace arrow { using std::string; using std::vector; +namespace { +// used to prevent compiler optimizing away side-effect-less statements +volatile int throw_away = 0; + +// checks if the padding of the buffers of the array is zero +// also causes valgrind warnings if the padding bytes are uninitialized +void AssertZeroPadded(const Array& array) { + for (const auto& buffer : array.data()->buffers) { +if (buffer) { + const int64_t padding = buffer->capacity() - buffer->size(); + std::vector zeros(padding); + ASSERT_EQ(0, memcmp(buffer->data() + buffer->size(), zeros.data(), padding)); +} + } +} + +// Check if the valid buffer bytes are initialized by +// calling memcmp on them which will cause valgrind warnings otherwise +void TestInitialized(const Array& array) { + for (const auto& buffer : array.data()->buffers) { +if (buffer) { + std::vector zeros(buffer->capacity()); + throw_away = memcmp(buffer->data(), zeros.data(), buffer->size()); +} + } +} + +template +void FinishAndCheckPadding(BuilderType* builder, std::shared_ptr* out) { + ASSERT_OK(builder->Finish(out)); + AssertZeroPadded(**out); + TestInitialized(**out); +} + +} // namespace + class TestArray : public ::testing::Test { public: void SetUp() { pool_ = default_memory_pool(); } @@ -209,10 +245,13 @@ TEST_F(TestArray, BuildLargeInMemoryArray) { BooleanBuilder builder; ASSERT_OK(builder.Reserve(length)); + + // Advance does not write to data, see docstring ASSERT_OK(builder.Advance(length)); + memset(builder.data()->mutable_data(), 0, BitUtil::BytesForBits(length)); std::shared_ptr result; - ASSERT_OK(builder.Finish()); + FinishAndCheckPadding(, ); ASSERT_EQ(length, result->length()); } @@ -281,7 +320,7 @@ class TestPrimitiveBuilder : public TestBuilder { std::make_shared(size, ex_data, ex_null_bitmap, ex_null_count); std::shared_ptr out; -ASSERT_OK(builder->Finish()); +FinishAndCheckPadding(builder.get(), ); std::shared_ptr result = std::dynamic_pointer_cast(out); @@ -405,7 +444,8 @@ void TestPrimitiveBuilder::Check(const std::unique_ptr // Finish builder and check result array std::shared_ptr out; - ASSERT_OK(builder->Finish()); + FinishAndCheckPadding(builder.get(), ); + std::shared_ptr result = std::dynamic_pointer_cast(out); ASSERT_EQ(ex_null_count, result->null_count()); @@ -456,14 +496,29 @@ TYPE
[arrow] branch master updated: ARROW-2798: [Plasma] Use hashing function that takes into account all UniqueID bytes
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 9d1432e ARROW-2798: [Plasma] Use hashing function that takes into account all UniqueID bytes 9d1432e is described below commit 9d1432eda73ea7cfac8ccd96207fd0191b159c2f Author: songqing AuthorDate: Fri Jul 6 13:51:03 2018 -0700 ARROW-2798: [Plasma] Use hashing function that takes into account all UniqueID bytes Now, the hashing of UniqueID in plasma is too simple which has caused a problem. In some cases(for example, in github/ray, UniqueID is composed of a taskID and a index), the UniqueID may be like "00", "ff01", "fff02" ... . The current hashing method is only to copy the first few bytes of a UniqueID and the result is that most of the hashed ids are same, so when the hashed ids put to plasma store, it will become very slow when se [...] In fact, the same PR has been merged into ray, see https://github.com/ray-project/ray/pull/2174. and I have tested the perf between the new hashing method and the original one with putting lots of objects continuously, it seems the new hashing method doesn't cost more time. Author: songqing Closes #2220 from songqing/oid-hashing and squashes the following commits: 5c803aa0 modify murmurhash LICENSE 8b8aa3e1 add murmurhash LICENSE d8d5f93f lint fix 426cd1e2 lint fix 4767751d Use hashing function that takes into account all UniqueID bytes --- LICENSE.txt | 13 cpp/src/plasma/common.cc | 53 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/LICENSE.txt b/LICENSE.txt index 30966d3..57299c4 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -297,6 +297,19 @@ You can contact the author at : +src/plasma/common.cc (some portions) + +Copyright (c) Austin Appleby (aappleby (AT) gmail) + +Some portions of this file are derived from code in the MurmurHash project + +All code is released to the public domain. For business purposes, Murmurhash is +under the MIT license. + +https://sites.google.com/site/murmurhash/ + + + src/arrow/util (some portions): Apache 2.0, and 3-clause BSD Some portions of this module are derived from code in the Chromium project, diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index 2e3899b..ae55fb9 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -68,12 +68,57 @@ std::string UniqueID::hex() const { return result; } -size_t UniqueID::hash() const { - size_t result; - std::memcpy(, id_, sizeof(size_t)); - return result; +// This code is from https://sites.google.com/site/murmurhash/ +// and is public domain. +uint64_t MurmurHash64A(const void* key, int len, unsigned int seed) { + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + + uint64_t h = seed ^ (len * m); + + const uint64_t* data = reinterpret_cast(key); + const uint64_t* end = data + (len / 8); + + while (data != end) { +uint64_t k = *data++; + +k *= m; +k ^= k >> r; +k *= m; + +h ^= k; +h *= m; + } + + const unsigned char* data2 = reinterpret_cast(data); + + switch (len & 7) { +case 7: + h ^= uint64_t(data2[6]) << 48; +case 6: + h ^= uint64_t(data2[5]) << 40; +case 5: + h ^= uint64_t(data2[4]) << 32; +case 4: + h ^= uint64_t(data2[3]) << 24; +case 3: + h ^= uint64_t(data2[2]) << 16; +case 2: + h ^= uint64_t(data2[1]) << 8; +case 1: + h ^= uint64_t(data2[0]); + h *= m; + } + + h ^= h >> r; + h *= m; + h ^= h >> r; + + return h; } +size_t UniqueID::hash() const { return MurmurHash64A(_[0], kUniqueIDSize, 0); } + bool UniqueID::operator==(const UniqueID& rhs) const { return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; }
[arrow] branch master updated: ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 037c156 ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma 037c156 is described below commit 037c156f16332904d4cac8974195aea2861f6c29 Author: Yuhong Guo AuthorDate: Wed Jul 4 16:26:05 2018 -0700 ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma This pull request includes following changes: 1. Add a RPC to delete a list of objects, which could be used in Garbage Collection in Ray according to the [Garbage Collection Discussion](https://github.com/ray-project/ray/issues/2242#issuecomment-398450187).. 2. Fix a bug in ReadDeleteRequest, change the wrong message type of PlasmaReleaseReply to PlasmaDeleteRequest. Author: Yuhong Guo Closes #2174 from guoyuhong/addDeleteObjs and squashes the following commits: 793bc5ba Change according to comment. c8562f64 Trigger build. 5f3aafe4 Change back Delete call. 7db64131 Add back Delete function for single obj for client d67c830e Fix warning failure. a1655fb1 Delete a comma. bc2efc28 Change according to comment 3a3b7862 Fix warning error. c67fb4de Fix a bug in ReadDeleteRequest d3188fb9 Add Delete Object list RPC to support garbage colletion in Ray --- cpp/src/plasma/client.cc | 33 --- cpp/src/plasma/client.h| 8 + cpp/src/plasma/format/plasma.fbs | 10 -- cpp/src/plasma/protocol.cc | 53 +++--- cpp/src/plasma/protocol.h | 12 --- cpp/src/plasma/store.cc| 11 +-- cpp/src/plasma/test/client_tests.cc| 33 +-- cpp/src/plasma/test/serialization_tests.cc | 24 +- 8 files changed, 137 insertions(+), 47 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 8e66cf4..95da089 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -182,7 +182,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this& object_ids); Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted); @@ -808,21 +808,26 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { return ReadAbortReply(buffer.data(), buffer.size(), ); } -Status PlasmaClient::Impl::Delete(const ObjectID& object_id) { +Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { RETURN_NOT_OK(FlushReleaseHistory()); - // If the object is in used, client can't send the remove message. - if (objects_in_use_.count(object_id) > 0) { -return Status::UnknownError("PlasmaClient::Object is in use."); - } else { -// If we don't already have a reference to the object, we can try to remove the object -RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id)); + std::vector not_in_use_ids; + for (auto& object_id : object_ids) { +// If the object is in used, skip it. +if (objects_in_use_.count(object_id) == 0) { + not_in_use_ids.push_back(object_id); +} + } + if (not_in_use_ids.size() > 0) { +RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, )); -ObjectID object_id2; DCHECK_GT(buffer.size(), 0); -RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), _id2)); -return Status::OK(); +std::vector error_codes; +not_in_use_ids.clear(); +RETURN_NOT_OK( +ReadDeleteReply(buffer.data(), buffer.size(), _in_use_ids, _codes)); } + return Status::OK(); } Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { @@ -1036,7 +1041,11 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(obje Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); } Status PlasmaClient::Delete(const ObjectID& object_id) { - return impl_->Delete(object_id); + return impl_->Delete(std::vector{object_id}); +} + +Status PlasmaClient::Delete(const std::vector& object_ids) { + return impl_->Delete(object_ids); } Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 5501488..fe00193 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -179,6 +179,14 @@ class ARROW_EXPORT PlasmaClient { /// \return The return status. Status Delete(const ObjectID& object_id); + /// Delete a list of objects from the object store. This currently assumes that the + /// object is present, has been sealed and not used by another client. Othe
[arrow] branch master updated: ARROW-2657: [Python] Import TensorFlow python extension before pyarrow to avoid segfault
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new e7aaf7b ARROW-2657: [Python] Import TensorFlow python extension before pyarrow to avoid segfault e7aaf7b is described below commit e7aaf7bf3d3e326b5fe58d20f8fc45b5cec01cac Author: Philipp Moritz AuthorDate: Wed Jul 4 13:08:34 2018 -0700 ARROW-2657: [Python] Import TensorFlow python extension before pyarrow to avoid segfault Author: Philipp Moritz Author: Wes McKinney Closes #2210 from pcmoritz/try-fixing-tf-crash and squashes the following commits: 92aef7a9 Use compat namespace to avoid adding import_tensorflow_extension to pyarrow.* namespace 2ca3de9f clarify comment 70f3bcaa workaround for virtualenv bbf6cfc3 load TensorFlow for sure if it exists c18cccb6 address comments ac38837d add clarification comment 1135b51b silence tensorflow installation 57ca5fc2 install conda to test wheels outside of docker 02cb5005 tests if the wheels work with tensorflow 7835fba1 check for linux 1c9628f1 try fixing tensorflow crash --- ci/travis_script_manylinux.sh | 18 ++ python/pyarrow/__init__.py| 7 +++ python/pyarrow/compat.py | 42 ++ 3 files changed, 67 insertions(+) diff --git a/ci/travis_script_manylinux.sh b/ci/travis_script_manylinux.sh index 14e6404..9ea15e7 100755 --- a/ci/travis_script_manylinux.sh +++ b/ci/travis_script_manylinux.sh @@ -24,3 +24,21 @@ pushd python/manylinux1 git clone ../../ arrow docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 . docker run --shm-size=2g --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh + +# Testing for https://issues.apache.org/jira/browse/ARROW-2657 +# These tests cannot be run inside of the docker container, since TensorFlow +# does not run on manylinux1 + +source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh + +source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh + +PYTHON_VERSION=3.6 +CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION + +conda create -y -q -p $CONDA_ENV_DIR python=$PYTHON_VERSION +source activate $CONDA_ENV_DIR + +pip install -q tensorflow +pip install "dist/`ls dist/ | grep cp36`" +python -c "import pyarrow; import tensorflow" diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 20254c2..dc045e6 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -44,6 +44,13 @@ except DistributionNotFound: __version__ = None +import pyarrow.compat as compat + + +# Workaround for https://issues.apache.org/jira/browse/ARROW-2657 +compat.import_tensorflow_extension() + + from pyarrow.lib import cpu_count, set_cpu_count from pyarrow.lib import (null, bool_, int8, int16, int32, int64, diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py index 1b19ca0..1fcaf4c 100644 --- a/python/pyarrow/compat.py +++ b/python/pyarrow/compat.py @@ -160,6 +160,48 @@ def encode_file_path(path): # will convert utf8 to utf16 return encoded_path +def import_tensorflow_extension(): +""" +Load the TensorFlow extension if it exists. + +This is used to load the TensorFlow extension before +pyarrow.lib. If we don't do this there are symbol clashes +between TensorFlow's use of threading and our global +thread pool, see also +https://issues.apache.org/jira/browse/ARROW-2657 and +https://github.com/apache/arrow/pull/2096. +""" +import os +import site +tensorflow_loaded = False + +# Try to load the tensorflow extension directly +# This is a performance optimization, tensorflow will always be +# loaded via the "import tensorflow" statement below if this +# doesn't succeed. +try: +site_paths = site.getsitepackages() + [site.getusersitepackages()] +except AttributeError: +# Workaround for https://github.com/pypa/virtualenv/issues/228, +# this happends in some configurations of virtualenv +site_paths = [os.path.dirname(site.__file__) + '/site-packages'] +for site_path in site_paths: +ext = os.path.join(site_path, "tensorflow", + "libtensorflow_framework.so") +if os.path.exists(ext): +import ctypes +ctypes.CDLL(ext) +tensorflow_loaded = True +break + +# If the above failed, try to load tensorflow the normal way +# (this is more expensive) +if not tensorflow_loaded: +try: +import tensorflow +except ImportError: +pass + integer_types = six.integer_types + (np.integer,)
[arrow] branch master updated: ARROW-2782: [Plasma] xfail plasma hugepage test
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 18f31e2 ARROW-2782: [Plasma] xfail plasma hugepage test 18f31e2 is described below commit 18f31e20b411bab6155107ba65bf67db33af1ce8 Author: Philipp Moritz AuthorDate: Mon Jul 2 13:54:24 2018 -0700 ARROW-2782: [Plasma] xfail plasma hugepage test This should be merged as a stop gap for ARROW-2782 so we can make sure to not disrupt arrow development till we have a real fix here (it's super annoying for others to have failing tests). Author: Philipp Moritz Closes #2207 from pcmoritz/xfail-hugepage-test and squashes the following commits: b4907ee3 fix linting cb86b85d xfail plasma hugepage test --- python/pyarrow/tests/test_plasma.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 8a7f4ca..99a5d5b 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -761,6 +761,7 @@ def test_object_id_equality_operators(): assert oid1 != 'foo' +@pytest.mark.xfail(reason="often fails on travis") @pytest.mark.skipif(not os.path.exists("/mnt/hugepages"), reason="requires hugepage support") def test_use_huge_pages():
[arrow] branch master updated: ARROW-2758: [Plasma] Use Scope enum in Plasma
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new d38e862 ARROW-2758: [Plasma] Use Scope enum in Plasma d38e862 is described below commit d38e86253668773005511b91e35b87aa7d36c29f Author: Yuhong Guo AuthorDate: Wed Jun 27 13:34:46 2018 -0700 ARROW-2758: [Plasma] Use Scope enum in Plasma In this PR there are following changes: 1. add option "--scoped-enum" to Flat Buffer Compiler. 2. change the old-styled c++ enum to c++11 style. This change will unify the enum style in both Plasma and Ray. See Ray [Issue2121](https://github.com/ray-project/ray/issues/2121) and Ray [PR2194](https://github.com/ray-project/ray/pull/2194). Author: Yuhong Guo Closes #2179 from guoyuhong/scopeEnum and squashes the following commits: fbe70105 Fix Python and Java Building failure. 397e8b1b Fix flatc failure. e96c99b3 Fix the comment mismatch. 8ab8716d Change old c++ enum style to c++11 style and fix lint 87cc5918 Add --scoped-enums to FlatBuffer Compiler --- cpp/src/plasma/CMakeLists.txt | 2 +- cpp/src/plasma/client.cc | 36 - cpp/src/plasma/client.h| 6 +- cpp/src/plasma/common.cc | 16 ++-- cpp/src/plasma/common.h| 34 cpp/src/plasma/format/plasma.fbs | 4 +- cpp/src/plasma/io.cc | 12 +-- cpp/src/plasma/io.h| 7 +- .../org_apache_arrow_plasma_PlasmaClientJNI.cc | 2 +- cpp/src/plasma/plasma.h| 4 +- cpp/src/plasma/protocol.cc | 90 +++--- cpp/src/plasma/protocol.h | 12 +-- cpp/src/plasma/store.cc| 82 ++-- cpp/src/plasma/store.h | 21 ++--- cpp/src/plasma/test/serialization_tests.cc | 65 +--- python/pyarrow/_plasma.pyx | 4 +- 16 files changed, 208 insertions(+), 189 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 8797d96..744f9ad 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -57,7 +57,7 @@ add_custom_command( # flatbuffers message Message, which can be used to store deserialized # messages in data structures. This is currently used for ObjectInfo for # example. - COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} --gen-object-api + COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} --gen-object-api --scoped-enums DEPENDS ${PLASMA_FBS_SRC} COMMENT "Running flatc compiler on ${PLASMA_FBS_SRC}" VERBATIM) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 30d2e43..8e66cf4 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -366,7 +366,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, RETURN_NOT_OK( SendCreateRequest(store_conn_, object_id, data_size, metadata_size, device_num)); std::vector buffer; - RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, )); + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, )); ObjectID id; PlasmaObject object; int store_fd; @@ -475,7 +475,7 @@ Status PlasmaClient::Impl::GetBuffers( // client, so we need to send a request to the plasma store. RETURN_NOT_OK(SendGetRequest(store_conn_, _ids[0], num_objects, timeout_ms)); std::vector buffer; - RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, )); + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, )); std::vector received_object_ids(num_objects); std::vector object_data(num_objects); PlasmaObject* object; @@ -677,7 +677,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) // to see if we have the object. RETURN_NOT_OK(SendContainsRequest(store_conn_, object_id)); std::vector buffer; -RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, )); +RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaContainsReply, )); ObjectID object_id2; DCHECK_GT(buffer.size(), 0); RETURN_NOT_OK( @@ -803,7 +803,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { std::vector buffer; ObjectID id; - int64_t type; + MessageType type; RETURN_NOT_OK(ReadMessage(store_conn_, , )); return ReadAbortReply(buffer.data(), buffer.size(), ); } @@ -817,7 +817,7 @@ Status PlasmaClient::Impl::Delete(const ObjectID& object_id) { // If we don't alread
[arrow] branch master updated: ARROW-1163: [Java] Java client support for plasma
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new ce23c06 ARROW-1163: [Java] Java client support for plasma ce23c06 is described below commit ce23c06469de9cf0c3e38e35cdb8d135f341b964 Author: salah-man <salah.li...@antfin.com> AuthorDate: Thu May 24 23:57:24 2018 -0700 ARROW-1163: [Java] Java client support for plasma -- This commit includes the support of java client for plasma, which is part of the java worker support of Ray. In addition to some minor changes in build system, it consists of the following modules: - java/plasma: java client support for plasma - cpp/src/plasma/lib/java: JNI support for plasma client - java/plasma/Readme.md: Readme.md for help document - java/plasma/test.sh: test.sh for test Author: salah-man <salah.li...@antfin.com> Author: Philipp Moritz <pcmor...@gmail.com> Author: salah <salah.li...@antfin.com> Closes #2065 from salah-man/java_lib and squashes the following commits: be067e83 add the test case of contains and hash of plasma java client b7f5e94f fix ci lint error 80e9580f small change for feedback b745465c fix the problems after the feedback 52b26e74 Google C++ convention 2f3f129b cleanups 1c327a79 add ci of plasma java client 14393f5f fix the ci problem 3e9ba0db Changes for the feedback from pr -- It consists of the following changes: add java/plasma/README.md for help document add java/plasma/test.sh for plasma java client test case fix some ci error delete the useless parameters of connect api from jni 9da40d02 Delete the ObjectId.java and ObjectBuffer.java, change the input and output of plasma java client api from custem type to byte 87ba3b98 Java client support for plasma -- This commit includes the support of java client for plasma, which is part of the java worker support of Ray. In addition to some minor changes in build system, it consists of the following modules: - java/plasma: java client support for plasma - cpp/src/plasma/lib/java: JNI support for plasma client --- .travis.yml| 3 + ci/travis_before_script_cpp.sh | 4 + ci/travis_script_plasma_java_client.sh | 30 +++ cpp/CMakeLists.txt | 4 + cpp/src/plasma/CMakeLists.txt | 35 +++ .../org_apache_arrow_plasma_PlasmaClientJNI.cc | 295 + .../java/org_apache_arrow_plasma_PlasmaClientJNI.h | 123 + java/plasma/README.md | 39 +++ java/plasma/pom.xml| 33 +++ .../org/apache/arrow/plasma/ObjectStoreLink.java | 119 + .../java/org/apache/arrow/plasma/PlasmaClient.java | 148 +++ .../org/apache/arrow/plasma/PlasmaClientJNI.java | 51 .../org/apache/arrow/plasma/PlasmaClientTest.java | 173 java/plasma/test.sh| 56 java/pom.xml | 1 + 15 files changed, 1114 insertions(+) diff --git a/.travis.yml b/.travis.yml index 7918eb8..c5b71e4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -129,6 +129,8 @@ matrix: env: ARROW_TEST_GROUP=integration jdk: openjdk8 env: +- ARROW_TRAVIS_PLASMA=1 +- ARROW_TRAVIS_PLASMA_JAVA_CLIENT=1 - CC="clang-6.0" - CXX="clang++-6.0" before_script: @@ -140,6 +142,7 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh +- $TRAVIS_BUILD_DIR/ci/travis_script_plasma_java_client.sh # NodeJS - language: node_js os: linux diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 379007d..541d5fd 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -74,6 +74,10 @@ if [ $ARROW_TRAVIS_PLASMA == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA=ON" fi +if [ $ARROW_TRAVIS_PLASMA_JAVA_CLIENT == "1" ]; then + CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA_JAVA_CLIENT=ON" +fi + if [ $ARROW_TRAVIS_ORC == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_ORC=ON" fi diff --git a/ci/travis_script_plasma_java_client.sh b/ci/travis_script_plasma_java_client.sh new file mode 100755 index 000..628796d --- /dev/null +++ b/ci/travis_script_plasma_java_client.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE
[arrow] branch master updated: ARROW-2629: [Plasma] Iterator invalidation for pending_notifications_
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 1d9d893 ARROW-2629: [Plasma] Iterator invalidation for pending_notifications_ 1d9d893 is described below commit 1d9d8939efe54a640b46fcbe8e14e5b347c7a422 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Tue May 22 20:45:40 2018 -0700 ARROW-2629: [Plasma] Iterator invalidation for pending_notifications_ Author: Philipp Moritz <pcmor...@gmail.com> Closes #2073 from pcmoritz/fix-iterator and squashes the following commits: bec37f0d linting 820f9d6d update 6b80ddcb docs 66a927c8 fix iterator invalidation --- cpp/src/plasma/store.cc | 30 +++--- cpp/src/plasma/store.h | 6 -- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 061b7ad..171f062 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -568,12 +568,17 @@ void PlasmaStore::disconnect_client(int client_fd) { /// Send notifications about sealed objects to the subscribers. This is called /// in seal_object. If the socket's send buffer is full, the notification will -/// be -/// buffered, and this will be called again when the send buffer has room. +/// be buffered, and this will be called again when the send buffer has room. +/// Since we call erase on pending_notifications_, all iterators get +/// invalidated, which is why we return a valid iterator to the next client to +/// be used in push_notification. /// -/// @param client_fd The client to send the notification to. -void PlasmaStore::send_notifications(int client_fd) { - auto& notifications = pending_notifications_[client_fd].object_notifications; +/// @param it Iterator that points to the client to send the notification to. +/// @return Iterator pointing to the next client. +PlasmaStore::NotificationMap::iterator PlasmaStore::send_notifications( +PlasmaStore::NotificationMap::iterator it) { + int client_fd = it->first; + auto& notifications = it->second.object_notifications; int num_processed = 0; bool closed = false; @@ -599,7 +604,7 @@ void PlasmaStore::send_notifications(int client_fd) { // TODO(pcm): Introduce status codes and check in case the file descriptor // is added twice. loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) { -send_notifications(client_fd); +send_notifications(pending_notifications_.find(client_fd)); }); break; } else { @@ -622,15 +627,18 @@ void PlasmaStore::send_notifications(int client_fd) { // Stop sending notifications if the pipe was broken. if (closed) { close(client_fd); -pending_notifications_.erase(client_fd); +return pending_notifications_.erase(it); + } else { +return ++it; } } void PlasmaStore::push_notification(ObjectInfoT* object_info) { - for (auto& element : pending_notifications_) { + auto it = pending_notifications_.begin(); + while (it != pending_notifications_.end()) { auto notification = create_object_info_buffer(object_info); -element.second.object_notifications.emplace_back(std::move(notification)); -send_notifications(element.first); +it->second.object_notifications.emplace_back(std::move(notification)); +it = send_notifications(it); } } @@ -654,7 +662,7 @@ void PlasmaStore::subscribe_to_updates(Client* client) { for (const auto& entry : store_info_.objects) { push_notification(>info); } - send_notifications(fd); + send_notifications(pending_notifications_.find(fd)); } Status PlasmaStore::process_message(Client* client) { diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 64c5249..9b3850b 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -54,6 +54,8 @@ struct Client { class PlasmaStore { public: + using NotificationMap = std::unordered_map<int, NotificationQueue>; + // TODO: PascalCase PlasmaStore methods. PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, bool hugetlbfs_enabled); @@ -161,7 +163,7 @@ class PlasmaStore { /// @param client_fd The client file descriptor that is disconnected. void disconnect_client(int client_fd); - void send_notifications(int client_fd); + NotificationMap::iterator send_notifications(NotificationMap::iterator it); Status process_message(Client* client); @@ -194,7 +196,7 @@ class PlasmaStore { /// descriptor to an array of object_ids to send to that client. /// TODO(pcm): Consider putting this into the Client data structure and /// reorganize the code slightly. - std::unordered_map<int, NotificationQueue> pending_notifications_; + NotificationMap p
[arrow] branch master updated: ARROW-2597: [Plasma] remove UniqueIDHasher
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new f319bca ARROW-2597: [Plasma] remove UniqueIDHasher f319bca is described below commit f319bcad7b3e5ed45b92f2489a6bb2e31ef44005 Author: Zhijun Fu <pingfu@antfin.com> AuthorDate: Sat May 19 12:10:56 2018 -0700 ARROW-2597: [Plasma] remove UniqueIDHasher Replace UniqueIDHasher with std::hash so that STL containers with ObjectID doesn't need to specify the compare function. This has already been done for Ray, this change applies it to Plasma. Author: Zhijun Fu <pingfu@antfin.com> Author: Zhijun Fu <zhijun...@outlook.com> Closes #2059 from zhijunfu/remove-UniqueIDHasher and squashes the following commits: 2498635a resolve review comments: remove const version of hash() d5b51690 remove UniqueIDHasher --- cpp/src/plasma/client.cc | 5 ++--- cpp/src/plasma/common.cc | 6 ++ cpp/src/plasma/common.h| 17 - cpp/src/plasma/eviction_policy.h | 2 +- cpp/src/plasma/plasma.h| 4 ++-- cpp/src/plasma/protocol.cc | 9 - cpp/src/plasma/protocol.h | 9 - cpp/src/plasma/store.cc| 5 ++--- cpp/src/plasma/store.h | 5 ++--- cpp/src/plasma/test/serialization_tests.cc | 2 +- 10 files changed, 32 insertions(+), 32 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index bfb291c..43e27e0 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -93,7 +93,7 @@ struct GpuProcessHandle { // This is necessary as IPC handles can only be mapped once per process. // Thus if multiple clients in the same process get the same gpu object, // they need to access the same mapped CudaBuffer. -static std::unordered_map<ObjectID, GpuProcessHandle*, UniqueIDHasher> gpu_object_map; +static std::unordered_map<ObjectID, GpuProcessHandle*> gpu_object_map; static std::mutex gpu_mutex; #endif @@ -247,8 +247,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this mmap_table_; /// A hash table of the object IDs that are currently being used by this /// client. - std::unordered_map<ObjectID, std::unique_ptr, UniqueIDHasher> - objects_in_use_; + std::unordered_map<ObjectID, std::unique_ptr> objects_in_use_; /// Object IDs of the last few release calls. This is a deque and /// is used to delay releasing objects to see if they can be reused by /// subsequent tasks so we do not unneccessarily invalidate cpu caches. diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index be3fc74..7ac5413 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -68,6 +68,12 @@ std::string UniqueID::hex() const { return result; } +size_t UniqueID::hash() const { + size_t result; + std::memcpy(, id_, sizeof(size_t)); + return result; +} + bool UniqueID::operator==(const UniqueID& rhs) const { return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; } diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index cc67ffe..7dbcf80 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -44,6 +44,7 @@ class ARROW_EXPORT UniqueID { uint8_t* mutable_data(); std::string binary() const; std::string hex() const; + size_t hash() const; private: uint8_t id_[kUniqueIDSize]; @@ -51,15 +52,6 @@ class ARROW_EXPORT UniqueID { static_assert(std::is_pod::value, "UniqueID must be plain old data"); -struct UniqueIDHasher { - // ObjectID hashing function. - size_t operator()(const UniqueID& id) const { -size_t result; -std::memcpy(, id.data(), sizeof(size_t)); -return result; - } -}; - typedef UniqueID ObjectID; arrow::Status plasma_error_status(int plasma_error); @@ -104,4 +96,11 @@ struct PlasmaStoreInfo; extern const PlasmaStoreInfo* plasma_config; } // namespace plasma +namespace std { +template <> +struct hash<::plasma::UniqueID> { + size_t operator()(const ::plasma::UniqueID& id) const { return id.hash(); } +}; +} // namespace std + #endif // PLASMA_COMMON_H diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index b076309..d13933e 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -52,7 +52,7 @@ class LRUCache { ItemList item_list_; /// A hash table mapping the object ID of an object in the cache to its /// location in the doubly linked list item_list_. - std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_; + std::unordered_map<ObjectID, ItemList::iterator> item_map_; }; /// The eviction polic
[arrow] branch master updated: ARROW-2612: [Plasma] Fix deprecated PLASMA_DEFAULT_RELEASE_DELAY
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new bd25a65 ARROW-2612: [Plasma] Fix deprecated PLASMA_DEFAULT_RELEASE_DELAY bd25a65 is described below commit bd25a655b2a3759db02def9ca5f8afb20f59 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Fri May 18 19:25:10 2018 -0700 ARROW-2612: [Plasma] Fix deprecated PLASMA_DEFAULT_RELEASE_DELAY Author: Philipp Moritz <pcmor...@gmail.com> Closes #2063 from pcmoritz/fix-plasma-deprecated-const and squashes the following commits: b6e92f67 fix test 5e1d82b7 add test 469b59a6 fix deprecated PLASMA_DEFAULT_RELEASE_DELAY --- cpp/src/plasma/client.h | 2 +- cpp/src/plasma/test/client_tests.cc | 7 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 4e1ff4a..c3b5548 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -36,7 +36,7 @@ namespace plasma { ARROW_DEPRECATED("PLASMA_DEFAULT_RELEASE_DELAY is deprecated") constexpr int64_t kDeprecatedPlasmaDefaultReleaseDelay = 64; -#define PLASMA_DEFAULT_RELEASE_DELAY kDeprecatedPlasmaDefaultReleaseDelay; +#define PLASMA_DEFAULT_RELEASE_DELAY plasma::kDeprecatedPlasmaDefaultReleaseDelay /// We keep a queue of unreleased objects cached in the client until we start /// sending release requests to the store. This is to avoid frequently mapping diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 23d2c2b..fa7de04 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -430,6 +430,13 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { } } +#ifndef ARROW_NO_DEPRECATED_API +TEST_F(TestPlasmaStore, DeprecatedApiTest) { + int64_t default_delay = PLASMA_DEFAULT_RELEASE_DELAY; + ARROW_CHECK(default_delay == plasma::kPlasmaDefaultReleaseDelay); +} +#endif // ARROW_NO_DEPRECATED_API + #ifdef PLASMA_GPU using arrow::gpu::CudaBuffer; using arrow::gpu::CudaBufferReader; -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2595: [Plasma] Use map.find instead of operator[] to avoid producing garbage data
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new df20683 ARROW-2595: [Plasma] Use map.find instead of operator[] to avoid producing garbage data df20683 is described below commit df2068381c13a4c805ed058b5e704168a31b00c8 Author: senlin.zsl <senlin@antfin.com> AuthorDate: Wed May 16 21:07:15 2018 -0700 ARROW-2595: [Plasma] Use map.find instead of operator[] to avoid producing garbage data - Problem * Using object_get_requests_[object_id] will produce a lot of garbage data in PlasmaStore::return_from_get. During the measurement process, we found that there was a lot of memory growth in this point. - Solution * Use iterator instead of operator [] Author: senlin.zsl <senlin@antfin.com> Closes #2056 from wumuzi520/dev_slz and squashes the following commits: ccaab502 Use map.find instead of operator to avoid producing garbage data --- cpp/src/plasma/store.cc | 13 - 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 95fbf49..310f0cb 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -300,11 +300,14 @@ void PlasmaStore::return_from_get(GetRequest* get_req) { // tables if it is present there. It should only be present there if the get // request timed out. for (ObjectID& object_id : get_req->object_ids) { -auto& get_requests = object_get_requests_[object_id]; -// Erase get_req from the vector. -auto it = std::find(get_requests.begin(), get_requests.end(), get_req); -if (it != get_requests.end()) { - get_requests.erase(it); +auto object_request_iter = object_get_requests_.find(object_id); +if (object_request_iter != object_get_requests_.end()) { + auto& get_requests = object_request_iter->second; + // Erase get_req from the vector. + auto it = std::find(get_requests.begin(), get_requests.end(), get_req); + if (it != get_requests.end()) { +get_requests.erase(it); + } } } // Remove the get request. -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2577: [Plasma] Add asv benchmarks for plasma
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 75acaba ARROW-2577: [Plasma] Add asv benchmarks for plasma 75acaba is described below commit 75acaba67d7e2ead5ccc51e42396e916b44db3f9 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Mon May 14 16:56:58 2018 -0700 ARROW-2577: [Plasma] Add asv benchmarks for plasma This adds some initial ASV benchmarks for plasma: - Put latency - Get latency - Put throughput for 1KB, 10KB, 100KB, 1MB, 10MB, 100MB It also includes some minor code restructuring to expose the start_plasma_store method. Author: Philipp Moritz <pcmor...@gmail.com> Closes #2038 from pcmoritz/plasma-asv and squashes the following commits: 34a06845 measure wallclock time instead of process cpu time c89256f7 parametrize tests 3567ddc7 fix windows build eca17675 build plasma in asv 47671b34 fix 1261177e fix linting errors 7d4d6854 Add asv benchmarks for plasma --- python/CMakeLists.txt | 2 +- python/asv-build.sh| 2 + python/benchmarks/plasma.py| 68 + python/pyarrow/{plasma.pyx => _plasma.pyx} | 0 python/pyarrow/plasma.py | 97 ++ python/pyarrow/tests/test_plasma.py| 88 +++ python/setup.py| 4 +- 7 files changed, 179 insertions(+), 82 deletions(-) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index fcc1d3c..99194c2 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -404,7 +404,7 @@ if (PYARROW_BUILD_PLASMA) ${LINK_LIBS} libplasma_shared) - set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _plasma) file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY}) endif() diff --git a/python/asv-build.sh b/python/asv-build.sh index 31e56ed..2bbc94b 100755 --- a/python/asv-build.sh +++ b/python/asv-build.sh @@ -37,6 +37,7 @@ cmake -GNinja \ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ -DARROW_CXXFLAGS=$CXXFLAGS \ -DARROW_PYTHON=ON \ + -DARROW_PLASMA=ON \ -DARROW_BUILD_TESTS=OFF \ .. cmake --build . --target install @@ -47,6 +48,7 @@ popd # Build pyarrow wrappers export SETUPTOOLS_SCM_PRETEND_VERSION=0.0.1 export PYARROW_BUILD_TYPE=release +export PYARROW_WITH_PLASMA=1 python setup.py clean find pyarrow -name "*.so" -delete diff --git a/python/benchmarks/plasma.py b/python/benchmarks/plasma.py new file mode 100644 index 000..8a607a3 --- /dev/null +++ b/python/benchmarks/plasma.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +import timeit + +import pyarrow as pa +import pyarrow.plasma as plasma + +from . import common + + +class SimplePlasmaThroughput(object): +"""Benchmark plasma store throughput with a single client.""" + +params = [1000, 10, 1000] + +timer = timeit.default_timer + +def setup(self, size): +self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9) +plasma_store_name, p = self.plasma_store_ctx.__enter__() +self.plasma_client = plasma.connect(plasma_store_name, "", 64) + +self.data = np.random.randn(size // 8) + +def teardown(self, size): +self.plasma_store_ctx.__exit__(None, None, None) + +def time_plasma_put_data(self, size): +self.plasma_client.put(self.data) + + +class SimplePlasmaLatency(object): +"""Benchmark plasma store latency with a single client.""" + +timer = timeit.default_timer + +def setup(self): +self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9) +plasma_store_name, p = self.plasma_store_ctx.__enter__() +self.plasma_client = plasma.connect(plasma_store_name, "&qu
[arrow] branch master updated: ARROW-2578: [Plasma] Use mersenne twister to generate random number
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 4b8511f ARROW-2578: [Plasma] Use mersenne twister to generate random number 4b8511f is described below commit 4b8511f92d24dc9d5ace150218a2902c8e32dc76 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Mon May 14 01:02:35 2018 -0700 ARROW-2578: [Plasma] Use mersenne twister to generate random number This gets rid of the std::random_device, which is slow and causes errors in valgrind. Instead we use the std::mt19937 Mersenne Twister. Author: Philipp Moritz <pcmor...@gmail.com> Closes #2039 from pcmoritz/new-rng and squashes the following commits: 21d0e3f7 fixes be4bb84d fix beb5bab8 update 83740b5c update f60bd99c more valgrind fixes 62d412f3 fix on older versions of macOS 841a67f0 fix linting cd95cf15 use mersenne twister to generate random number --- cpp/src/plasma/common.cc| 12 ++-- cpp/src/plasma/test/client_tests.cc | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index 2de06d5..be3fc74 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -17,6 +17,8 @@ #include "plasma/common.h" +#include +#include #include #include "plasma/plasma_generated.h" @@ -28,9 +30,15 @@ using arrow::Status; UniqueID UniqueID::from_random() { UniqueID id; uint8_t* data = id.mutable_data(); - std::random_device engine; + // NOTE(pcm): The right way to do this is to have one std::mt19937 per + // thread (using the thread_local keyword), but that's not supported on + // older versions of macOS (see https://stackoverflow.com/a/29929949) + static std::mutex mutex; + std::lock_guard lock(mutex); + static std::mt19937 generator; + std::uniform_int_distribution dist(0, std::numeric_limits::max()); for (int i = 0; i < kUniqueIDSize; i++) { -data[i] = static_cast(engine()); +data[i] = static_cast(dist(generator)); } return id; } diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index b80862d..23d2c2b 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -51,8 +51,8 @@ class TestPlasmaStore : public ::testing::Test { // TODO(pcm): At the moment, stdout of the test gets mixed up with // stdout of the object store. Consider changing that. void SetUp() { -std::mt19937 rng; -rng.seed(std::random_device()()); +uint64_t seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); +std::mt19937 rng(static_cast(seed)); std::string store_index = std::to_string(rng()); store_socket_name_ = "/tmp/store" + store_index; -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2565: [Plasma] new subscriber cannot receive notifications about existing objects
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new bb47c36 ARROW-2565: [Plasma] new subscriber cannot receive notifications about existing objects bb47c36 is described below commit bb47c36571ab528ae582fda2e0c1037c01f37ffe Author: Zhijun Fu <zhijun...@outlook.com> AuthorDate: Thu May 10 12:17:54 2018 -0700 ARROW-2565: [Plasma] new subscriber cannot receive notifications about existing objects When a client subscribes to plasma store, we need to add its file descriptor to pending_notifications_ map, so that push_notifications() can find the new client and push notifications about existing objects to it. Also added an unit test case to cover this. @pcmoritz may you kindly help to take a look please? thanks:) Author: Zhijun Fu <zhijun...@outlook.com> Closes #2022 from zhijunfu/refactor-code and squashes the following commits: 398354ef Fix issue that new subscriber can't receive notifications about existing objects, and add unit test --- cpp/src/plasma/store.cc | 3 ++ cpp/src/plasma/test/client_tests.cc | 59 + 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 342d228..3c1d5c8 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -636,6 +636,9 @@ void PlasmaStore::subscribe_to_updates(Client* client) { return; } + // Add this fd to global map, which is needed for this client to receive notifications. + pending_notifications_[fd]; + // Push notifications to the new subscriber about existing objects. for (const auto& entry : store_info_.objects) { push_notification(>info); diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 547a749..b80862d 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -54,15 +54,15 @@ class TestPlasmaStore : public ::testing::Test { std::mt19937 rng; rng.seed(std::random_device()()); std::string store_index = std::to_string(rng()); +store_socket_name_ = "/tmp/store" + store_index; std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); -std::string plasma_command = plasma_directory + - "/plasma_store -m 10 -s /tmp/store" + - store_index + " 1> /dev/null 2> /dev/null &"; +std::string plasma_command = plasma_directory + "/plasma_store -m 10 -s " + + store_socket_name_ + " 1> /dev/null 2> /dev/null &"; system(plasma_command.c_str()); -ARROW_CHECK_OK(client_.Connect("/tmp/store" + store_index, "")); -ARROW_CHECK_OK(client2_.Connect("/tmp/store" + store_index, "")); +ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); +ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); } virtual void TearDown() { ARROW_CHECK_OK(client_.Disconnect()); @@ -91,11 +91,60 @@ class TestPlasmaStore : public ::testing::Test { ARROW_CHECK_OK(client.Release(object_id)); } + const std::string& GetStoreSocketName() const { return store_socket_name_; } + protected: PlasmaClient client_; PlasmaClient client2_; + std::string store_socket_name_; }; +TEST_F(TestPlasmaStore, NewSubscriberTest) { + PlasmaClient local_client, local_client2; + + ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); + ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, "")); + + ObjectID object_id = ObjectID::from_random(); + + // Test for the object being in local Plasma store. + // First create object. + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + std::shared_ptr data; + ARROW_CHECK_OK( + local_client.Create(object_id, data_size, metadata, metadata_size, )); + ARROW_CHECK_OK(local_client.Seal(object_id)); + + // Test that new subscriber client2 can receive notifications about existing objects. + int fd = -1; + ARROW_CHECK_OK(local_client2.Subscribe()); + ASSERT_GT(fd, 0); + + ObjectID object_id2 = ObjectID::from_random(); + int64_t data_size2 = 0; + int64_t metadata_size2 = 0; + ARROW_CHECK_OK( + local_client2.GetNotification(fd, _id2, _size2, _size2)); + ASSERT_EQ(object_id, object_id2); + ASSERT_EQ(data_size, data_size2); + ASSERT_EQ(metadata_size, metadata_size2); + + // Delete the object. + ARROW_CHECK_OK(local_client.Release(object_id)); + ARROW_CHECK_OK(local_client.Delete(object_id)); +
[arrow] branch master updated: ARROW-2552: [Plasma] Fix memory error
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new ac91d72 ARROW-2552: [Plasma] Fix memory error ac91d72 is described below commit ac91d72c02927bfa6cc4eb5a05eae19013b61e8a Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Wed May 9 21:03:36 2018 -0700 ARROW-2552: [Plasma] Fix memory error I reran Travis 12 times and the test failure didn't happen (the fix is in https://github.com/apache/arrow/pull/2019/commits/7e03ac833215263f65c9a64b14f229ed72132b4f). Author: Philipp Moritz <pcmor...@gmail.com> Closes #2019 from pcmoritz/fix-memcheck and squashes the following commits: 3540d139 bring back zero initialization 8dce7721 initialize struct 51d2588f fix 84b6ca75 cleanups 7e03ac83 fix memory error --- cpp/src/plasma/store.cc | 28 +--- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 6253afb..342d228 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -562,14 +562,14 @@ void PlasmaStore::disconnect_client(int client_fd) { /// /// @param client_fd The client to send the notification to. void PlasmaStore::send_notifications(int client_fd) { - auto it = pending_notifications_.find(client_fd); + auto& notifications = pending_notifications_[client_fd].object_notifications; int num_processed = 0; bool closed = false; // Loop over the array of pending notifications and send as many of them as // possible. - for (size_t i = 0; i < it->second.object_notifications.size(); ++i) { -auto& notification = it->second.object_notifications.at(i); + for (size_t i = 0; i < notifications.size(); ++i) { +auto& notification = notifications.at(i); // Decode the length, which is the first bytes of the message. int64_t size = *(reinterpret_cast<int64_t*>(notification.get())); @@ -601,20 +601,18 @@ void PlasmaStore::send_notifications(int client_fd) { num_processed += 1; } // Remove the sent notifications from the array. - it->second.object_notifications.erase( - it->second.object_notifications.begin(), - it->second.object_notifications.begin() + num_processed); + notifications.erase(notifications.begin(), notifications.begin() + num_processed); + + // If we have sent all notifications, remove the fd from the event loop. + if (notifications.empty()) { +loop_->RemoveFileEvent(client_fd); + } // Stop sending notifications if the pipe was broken. if (closed) { close(client_fd); pending_notifications_.erase(client_fd); } - - // If we have sent all notifications, remove the fd from the event loop. - if (it->second.object_notifications.empty()) { -loop_->RemoveFileEvent(client_fd); - } } void PlasmaStore::push_notification(ObjectInfoT* object_info) { @@ -622,8 +620,6 @@ void PlasmaStore::push_notification(ObjectInfoT* object_info) { auto notification = create_object_info_buffer(object_info); element.second.object_notifications.emplace_back(std::move(notification)); send_notifications(element.first); -// The notification gets freed in send_notifications when the notification -// is sent over the socket. } } @@ -640,12 +636,6 @@ void PlasmaStore::subscribe_to_updates(Client* client) { return; } - // Create a new array to buffer notifications that can't be sent to the - // subscriber yet because the socket send buffer is full. TODO(rkn): the queue - // never gets freed. - // TODO(pcm): Is the following neccessary? - pending_notifications_[fd]; - // Push notifications to the new subscriber about existing objects. for (const auto& entry : store_info_.objects) { push_notification(>info); -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2540: [Plasma] Create constructors & destructors for ObjectTableEntry
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 0f07171 ARROW-2540: [Plasma] Create constructors & destructors for ObjectTableEntry 0f07171 is described below commit 0f0717148ec2a216b88f667f68d936d7f0810df8 Author: Zhijun Fu <zhijun...@outlook.com> AuthorDate: Mon May 7 12:30:47 2018 -0700 ARROW-2540: [Plasma] Create constructors & destructors for ObjectTableEntry This makes sure dlfree() is called for pointer field automatically Author: Zhijun Fu <zhijun...@outlook.com> Closes #1996 from zhijunfu/dlfree and squashes the following commits: 9363b4c5 Trigger travis build 9f56a850 re-trigger travis build 3246b843 fix format check a8c67b84 Create constructors & destructors for ObjectTableEntry --- cpp/src/plasma/plasma.cc | 11 +++ cpp/src/plasma/plasma.h | 4 cpp/src/plasma/store.cc | 3 --- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index e57049d..60b7c3f 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -26,6 +26,17 @@ namespace plasma { +extern "C" { +void dlfree(void* mem); +} + +ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {} + +ObjectTableEntry::~ObjectTableEntry() { + dlfree(pointer); + pointer = nullptr; +} + int warn_if_sigpipe(int status, int client_sock) { if (status >= 0) { return 0; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index a1d6e99..7a513ea 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -112,6 +112,10 @@ enum object_status { /// This type is used by the Plasma store. It is here because it is exposed to /// the eviction policy. struct ObjectTableEntry { + ObjectTableEntry(); + + ~ObjectTableEntry(); + /// Object id of this object. ObjectID object_id; /// Object info like size, creation time and owner. diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c06ad6a..6253afb 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -446,7 +446,6 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) { return 0; } else { // The client requesting the abort is the creator. Free the object. -dlfree(entry->pointer); store_info_.objects.erase(object_id); return 1; } @@ -474,7 +473,6 @@ int PlasmaStore::delete_object(ObjectID& object_id) { eviction_policy_.remove_object(object_id); - dlfree(entry->pointer); store_info_.objects.erase(object_id); // Inform all subscribers that the object has been deleted. ObjectInfoT notification; @@ -497,7 +495,6 @@ void PlasmaStore::delete_objects(const std::vector& object_ids) { << "To delete an object it must have been sealed."; ARROW_CHECK(entry->clients.size() == 0) << "To delete an object, there must be no clients currently using it."; -dlfree(entry->pointer); store_info_.objects.erase(object_id); // Inform all subscribers that the object has been deleted. ObjectInfoT notification; -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new b916c79 ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer b916c79 is described below commit b916c797833541299f9c3adc6e24d109359b7a6c Author: Zhijun Fu <zhijun...@outlook.com> AuthorDate: Fri May 4 12:26:29 2018 -0700 ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer use unique_ptr to replace raw pointer, so that allocated memory can be freed automatically Author: Zhijun Fu <zhijun...@outlook.com> Closes #1993 from zhijunfu/improve-code and squashes the following commits: 3c69ada9 fix format check 6bfebc2d fix lint b5b2fac2 fix build on travis-ci d4d64b02 Merge branch 'master' of https://github.com/zhijunfu/arrow into improve-code 84b7e371 Use unique_ptr instead of raw pointer --- cpp/src/plasma/client.cc | 5 ++--- cpp/src/plasma/io.cc | 8 cpp/src/plasma/io.h | 3 ++- cpp/src/plasma/plasma.cc | 9 + cpp/src/plasma/plasma.h | 2 +- cpp/src/plasma/store.cc | 32 +--- cpp/src/plasma/store.h | 2 +- 7 files changed, 24 insertions(+), 37 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 2a6f183..b4ee098 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -874,11 +874,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { - uint8_t* notification = read_message_async(fd); + auto notification = read_message_async(fd); if (notification == NULL) { return Status::IOError("Failed to read object notification from Plasma socket"); } - auto object_info = flatbuffers::GetRoot(notification); + auto object_info = flatbuffers::GetRoot(notification.get()); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID)); if (object_info->is_deletion()) { @@ -888,7 +888,6 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, *data_size = object_info->data_size(); *metadata_size = object_info->metadata_size(); } - delete[] notification; return Status::OK(); } diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index 2cba897..4142bf9 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -18,6 +18,7 @@ #include "plasma/io.h" #include +#include #include #include "arrow/status.h" @@ -210,7 +211,7 @@ int AcceptClient(int socket_fd) { return client_fd; } -uint8_t* read_message_async(int sock) { +std::unique_ptr<uint8_t[]> read_message_async(int sock) { int64_t size; Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(), sizeof(int64_t)); if (!s.ok()) { @@ -219,10 +220,9 @@ uint8_t* read_message_async(int sock) { close(sock); return NULL; } - uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size)); - s = ReadBytes(sock, message, size); + auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]); + s = ReadBytes(sock, message.get(), size); if (!s.ok()) { -free(message); /* The other side has closed the socket. */ ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; close(sock); diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h index 4beb134..8869c9b 100644 --- a/cpp/src/plasma/io.h +++ b/cpp/src/plasma/io.h @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -56,7 +57,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, int AcceptClient(int socket_fd); -uint8_t* read_message_async(int sock); +std::unique_ptr<uint8_t[]> read_message_async(int sock); } // namespace plasma diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index 0a019dd..e57049d 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -51,13 +51,14 @@ int warn_if_sigpipe(int status, int client_sock) { * @return The object info buffer. It is the caller's responsibility to free * this buffer with "delete" after it has been used. */ -uint8_t* create_object_info_buffer(ObjectInfoT* object_info) { +std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) { flatbuffers::FlatBufferBuilder fbb; auto message = CreateObjectInfo(fbb, object_info); fbb.Finish(message); - uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()]; - *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize(); - memcpy(notification + sizeof
[arrow] branch master updated: ARROW-2448: [Plasma] Reference counting for PlasmaClient::Impl
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 5f9cf9c ARROW-2448: [Plasma] Reference counting for PlasmaClient::Impl 5f9cf9c is described below commit 5f9cf9c96709f92e9ac4828cf3e106a165576ce7 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Wed Apr 25 11:41:25 2018 -0700 ARROW-2448: [Plasma] Reference counting for PlasmaClient::Impl This is a followup to https://github.com/apache/arrow/pull/1933 which does reference counting of the PlasmaClient held by PlasmaBuffers to avoid the segfault in ARROW-2448. Author: Philipp Moritz <pcmor...@gmail.com> Closes #1939 from pcmoritz/autoget-sharedptr and squashes the following commits: f1e6e8b7 fix test 2da395f9 update 13b12049 fixes b68b15e4 fix ObjectStatus 6d560db4 remove headers 94cdfd7c add test 6798ed01 Give shared_ptr of PlasmaClient::Impl to PlasmaBuffer --- cpp/src/plasma/client.cc| 10 -- cpp/src/plasma/format/plasma.fbs| 2 +- python/pyarrow/tests/test_plasma.py | 16 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 733217d..2a6f183 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -23,10 +23,8 @@ #include #endif -#include #include #include -#include #include #include #include @@ -107,7 +105,7 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { public: ~PlasmaBuffer(); - PlasmaBuffer(PlasmaClient::Impl* client, const ObjectID& object_id, + PlasmaBuffer(std::shared_ptr client, const ObjectID& object_id, const std::shared_ptr& buffer) : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) { if (buffer->is_mutable()) { @@ -116,7 +114,7 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { } private: - PlasmaClient::Impl* client_; + std::shared_ptr client_; ObjectID object_id_; }; @@ -155,7 +153,7 @@ struct ClientMmapTableEntry { int count; }; -class PlasmaClient::Impl { +class PlasmaClient::Impl : public std::enable_shared_from_this { public: Impl(); ~Impl(); @@ -558,7 +556,7 @@ Status PlasmaClient::Impl::Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* out) { const auto wrap_buffer = [=](const ObjectID& object_id, const std::shared_ptr& buffer) { -return std::make_shared(this, object_id, buffer); +return std::make_shared(shared_from_this(), object_id, buffer); }; const size_t num_objects = object_ids.size(); *out = std::vector(num_objects); diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs index 71e6f5c..0258cdf 100644 --- a/cpp/src/plasma/format/plasma.fbs +++ b/cpp/src/plasma/format/plasma.fbs @@ -216,7 +216,7 @@ table PlasmaStatusRequest { enum ObjectStatus:int { // Object is stored in the local Plasma Store. - Local = 1, + Local, // Object is stored on a remote Plasma store, and it is not stored on the // local Plasma Store. Remote, diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 32a4ed3..1589c1b 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -840,3 +840,19 @@ def test_use_huge_pages(): use_hugepages=True) as (plasma_store_name, p): plasma_client = plasma.connect(plasma_store_name, "", 64) create_object(plasma_client, 1) + + +# This is checking to make sure plasma_clients cannot be destroyed +# before all the PlasmaBuffers that have handles to them are +# destroyed, see ARROW-2448. +@pytest.mark.plasma +def test_plasma_client_sharing(): +import pyarrow.plasma as plasma + +with start_plasma_store() as (plasma_store_name, p): +plasma_client = plasma.connect(plasma_store_name, "", 64) +object_id = plasma_client.put(np.zeros(3)) +buf = plasma_client.get(object_id) +del plasma_client +assert (buf == np.zeros(3)).all() +del buf # This segfaulted pre ARROW-2448. -- To stop receiving notification emails like this one, please contact pcmor...@apache.org.
[arrow] branch master updated: ARROW-2489: [Plasma] Fix PlasmaClient ABI variation
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 2abc889 ARROW-2489: [Plasma] Fix PlasmaClient ABI variation 2abc889 is described below commit 2abc88986d1728beb016ffb9f35b6250cf0d9b7a Author: Antoine Pitrou <anto...@python.org> AuthorDate: Mon Apr 23 23:54:40 2018 -0700 ARROW-2489: [Plasma] Fix PlasmaClient ABI variation When compiled with GPU support, the PlasmaClient ABI would differ, leading to a crash in the Python bindings to Plasma. Author: Antoine Pitrou <anto...@python.org> Closes #1933 from pitrou/ARROW-2489-plasma-client-abi and squashes the following commits: 7d28354f ARROW-2489: Fix PlasmaClient ABI variation --- cpp/src/plasma/client.cc | 364 +++ cpp/src/plasma/client.h | 100 + 2 files changed, 309 insertions(+), 155 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 015c973..733217d 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -34,10 +35,14 @@ #include #include #include +#include #include #include +#include #include +#include +#include #include #include "arrow/buffer.h" @@ -66,17 +71,43 @@ namespace plasma { using arrow::MutableBuffer; +typedef struct XXH64_state_s XXH64_state_t; + // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; +// Use 100MB as an overestimate of the L3 cache size. +constexpr int64_t kL3CacheSizeBytes = 1; + +// -- +// GPU support + +#ifdef PLASMA_GPU +struct GpuProcessHandle { + /// Pointer to CUDA buffer that is backing this GPU object. + std::shared_ptr ptr; + /// Number of client using this GPU object. + int client_count; +}; + +// This is necessary as IPC handles can only be mapped once per process. +// Thus if multiple clients in the same process get the same gpu object, +// they need to access the same mapped CudaBuffer. +static std::unordered_map<ObjectID, GpuProcessHandle*, UniqueIDHasher> gpu_object_map; +static std::mutex gpu_mutex; +#endif + +// -- +// PlasmaBuffer + /// A Buffer class that automatically releases the backing plasma object /// when it goes out of scope. -class PlasmaBuffer : public Buffer { +class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { public: ~PlasmaBuffer(); - PlasmaBuffer(PlasmaClient* client, const ObjectID& object_id, + PlasmaBuffer(PlasmaClient::Impl* client, const ObjectID& object_id, const std::shared_ptr& buffer) : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) { if (buffer->is_mutable()) { @@ -85,11 +116,12 @@ class PlasmaBuffer : public Buffer { } private: - PlasmaClient* client_; + PlasmaClient::Impl* client_; ObjectID object_id_; }; -PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); } +// -- +// PlasmaClient::Impl struct ObjectInUseEntry { /// A count of the number of times this client has called PlasmaClient::Create @@ -105,33 +137,158 @@ struct ObjectInUseEntry { bool is_sealed; }; -#ifdef PLASMA_GPU -struct GpuProcessHandle { - /// Pointer to CUDA buffer that is backing this GPU object. - std::shared_ptr ptr; - /// Number of client using this GPU object. - int client_count; +/// Configuration options for the plasma client. +struct PlasmaClientConfig { + /// Number of release calls we wait until the object is actually released. + /// This allows us to avoid invalidating the cpu cache on workers if objects + /// are reused accross tasks. + size_t release_delay; }; -// This is necessary as IPC handles can only be mapped once per process. -// Thus if multiple clients in the same process get the same gpu object, -// they need to access the same mapped CudaBuffer. -static std::unordered_map<ObjectID, GpuProcessHandle*, UniqueIDHasher> gpu_object_map; -static std::mutex gpu_mutex; +struct ClientMmapTableEntry { + /// The result of mmap for this file descriptor. + uint8_t* pointer; + /// The length of the memory-mapped file. + size_t length; + /// The number of objects in this memory-mapped file that are currently being + /// used by the client. When this count reaches zeros, we unmap the file. + int count; +}; + +class PlasmaClient::Impl { + public: + Impl(); + ~Impl(); + + // PlasmaClient method implementations + + Status Connect(cons
[arrow] branch master updated: ARROW-2308: [Python] Make deserialized numpy arrays 64-byte aligned.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 26bc4ab ARROW-2308: [Python] Make deserialized numpy arrays 64-byte aligned. 26bc4ab is described below commit 26bc4ab5a31e3430e1d545068e5a5a5ba5bc7a22 Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Wed Apr 4 11:31:40 2018 -0700 ARROW-2308: [Python] Make deserialized numpy arrays 64-byte aligned. I'm not too sure about the relation to https://issues.apache.org/jira/browse/ARROW-1860, but I thought I'd open this for comment. This aligns numpy arrays to 64-byte boundaries. cc @pcmoritz @wesm @yaroslavvb Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1802 from robertnishihara/numpyalignment and squashes the following commits: 21840106 Document argument better. b539ba09 Don't set aligned = True in RecordBatch case. b4691aa5 Add test for strided case. c963dbbf Don't use casts. b72f6c49 Remove helper method. d0e66dfd Remove code duplication. f670d474 add test d9aaa089 Add ReadMessageAligned method. 614df5d7 First pass at numpy array alignment issue. --- cpp/src/arrow/ipc/message.cc | 14 +- cpp/src/arrow/ipc/message.h| 3 ++- cpp/src/arrow/ipc/reader.cc| 7 --- cpp/src/arrow/ipc/writer.cc| 6 ++ python/pyarrow/tests/test_serialization.py | 23 +++ 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index fd74756..896221e 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/Schema_generated.h" #include "arrow/ipc/metadata-internal.h" +#include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/util/logging.h" @@ -207,7 +208,8 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile return Message::ReadFrom(metadata, file, message); } -Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { +Status ReadMessage(io::InputStream* file, std::unique_ptr* message, + bool aligned) { int32_t message_length = 0; int64_t bytes_read = 0; RETURN_NOT_OK(file->Read(sizeof(int32_t), _read, @@ -233,6 +235,16 @@ Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { return Status::Invalid(ss.str()); } + // If requested, align the file before reading the message. + if (aligned) { +int64_t offset; +RETURN_NOT_OK(file->Tell()); +int64_t aligned_offset = PaddedLength(offset); +int64_t num_extra_bytes = aligned_offset - offset; +std::shared_ptr dummy_buffer; +RETURN_NOT_OK(file->Read(num_extra_bytes, _buffer)); + } + return Message::ReadFrom(metadata, file, message); } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 159b39a..4e0089b 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -182,7 +182,8 @@ Status ReadMessage(const int64_t offset, const int32_t metadata_length, /// there are not enough bytes available or the message length is 0 (e.g. EOS /// in a stream) ARROW_EXPORT -Status ReadMessage(io::InputStream* stream, std::unique_ptr* message); +Status ReadMessage(io::InputStream* stream, std::unique_ptr* message, + bool aligned = false); } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index cc3b6e5..aefd491 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -684,8 +684,9 @@ Status RecordBatchFileReader::ReadRecordBatch(int i, } static Status ReadContiguousPayload(io::InputStream* file, -std::unique_ptr* message) { - RETURN_NOT_OK(ReadMessage(file, message)); +std::unique_ptr* message, +bool aligned = false) { + RETURN_NOT_OK(ReadMessage(file, message, aligned)); if (*message == nullptr) { return Status::Invalid("Unable to read metadata at offset"); } @@ -715,7 +716,7 @@ Status ReadTensor(int64_t offset, io::RandomAccessFile* file, RETURN_NOT_OK(file->Seek(offset)); std::unique_ptr message; - RETURN_NOT_OK(ReadContiguousPayload(file, )); + RETURN_NOT_OK(ReadContiguousPayload(file, , true /* aligned */)); return ReadTensor(*message, out); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 078efe1..86f2ed1 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -618
[arrow] branch master updated: ARROW-2195: [Plasma] Return auto-releasing buffers
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new cf39686 ARROW-2195: [Plasma] Return auto-releasing buffers cf39686 is described below commit cf396867df6f1f93948c69ce10ceb0f95e399242 Author: Antoine Pitrou <anto...@python.org> AuthorDate: Wed Apr 4 10:51:51 2018 -0700 ARROW-2195: [Plasma] Return auto-releasing buffers On the C++ side, add a new PlasmaClient::GetAuto() method to return buffers that release the corresponding object on destruction. On the Python side, return such buffers in PlasmaClient.get_buffers(). Author: Antoine Pitrou <anto...@python.org> Closes #1807 from pitrou/ARROW-2195-plasma-buffers and squashes the following commits: e3747cf7 Remove XXX comments ac2d84c3 Migrate PlasmaClient::Get() 927a5352 Use GetAuto() in plasma.pyx, not Get() 562abc5d Use ARROW_UNUSED() 4d33403f Move FRIEND_TEST to the Arrow codebase 123ea839 Try to fix odd failure 43b377f8 Add tests for device_num dee2f5dd Add Python test 30a439d1 Cleanups 483bbdbf Allow getting back CUDA buffer from buffer c2af4d36 ARROW-2195: Return auto-releasing buffers --- cpp/src/arrow/gpu/cuda-test.cc | 43 +++ cpp/src/arrow/gpu/cuda_memory.cc| 38 +- cpp/src/arrow/gpu/cuda_memory.h | 11 +- cpp/src/arrow/test-util.h | 8 ++ cpp/src/arrow/util/macros.h | 25 cpp/src/plasma/client.cc| 146 ++- cpp/src/plasma/client.h | 58 ++--- cpp/src/plasma/test/client_tests.cc | 226 +--- python/pyarrow/plasma.pyx | 40 +++ python/pyarrow/tests/test_plasma.py | 37 +- 10 files changed, 456 insertions(+), 176 deletions(-) diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index ae411c9..04e1f92 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -80,6 +80,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) { AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); } +TEST_F(TestCudaBuffer, FromBuffer) { + const int64_t kSize = 1000; + // Initialize device buffer with random data + std::shared_ptr host_buffer; + std::shared_ptr device_buffer; + ASSERT_OK(context_->Allocate(kSize, _buffer)); + ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), _buffer)); + ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 1000)); + // Sanity check + AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); + + // Get generic Buffer from device buffer + std::shared_ptr buffer; + std::shared_ptr result; + buffer = std::static_pointer_cast(device_buffer); + ASSERT_OK(CudaBuffer::FromBuffer(buffer, )); + ASSERT_EQ(result->size(), kSize); + ASSERT_EQ(result->is_mutable(), true); + ASSERT_EQ(result->mutable_data(), buffer->mutable_data()); + AssertCudaBufferEquals(*result, host_buffer->data(), kSize); + + buffer = SliceBuffer(device_buffer, 0, kSize); + ASSERT_OK(CudaBuffer::FromBuffer(buffer, )); + ASSERT_EQ(result->size(), kSize); + ASSERT_EQ(result->is_mutable(), false); + AssertCudaBufferEquals(*result, host_buffer->data(), kSize); + + buffer = SliceMutableBuffer(device_buffer, 0, kSize); + ASSERT_OK(CudaBuffer::FromBuffer(buffer, )); + ASSERT_EQ(result->size(), kSize); + ASSERT_EQ(result->is_mutable(), true); + ASSERT_EQ(result->mutable_data(), buffer->mutable_data()); + AssertCudaBufferEquals(*result, host_buffer->data(), kSize); + + buffer = SliceMutableBuffer(device_buffer, 3, kSize - 10); + buffer = SliceMutableBuffer(buffer, 8, kSize - 20); + ASSERT_OK(CudaBuffer::FromBuffer(buffer, )); + ASSERT_EQ(result->size(), kSize - 20); + ASSERT_EQ(result->is_mutable(), true); + ASSERT_EQ(result->mutable_data(), buffer->mutable_data()); + AssertCudaBufferEquals(*result, host_buffer->data() + 11, kSize - 20); +} + // IPC only supported on Linux #if defined(__linux) diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 183cbcb..a245509 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -98,7 +98,34 @@ CudaBuffer::CudaBuffer(const std::shared_ptr& parent, const int64_t : Buffer(parent, offset, size), context_(parent->context()), own_data_(false), - is_ipc_(false) {} + is_ipc_(false) { + if (parent->is_mutable()) { +is_mutable_ = true; +mutable_data_ = const_cast<uint8_t*>(data_); + } +} + +Status CudaBuffer::FromBuffer(std::shared_ptr buffer, + std::shared_ptr* out) { + int64_t offset = 0, size = buffer->size(); + bool is_m
[arrow] branch master updated: ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new d135974 ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest. d135974 is described below commit d135974a0d3dd9a9fbbb10da4c5dbc65f9324234 Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Sat Jan 20 13:41:23 2018 -0800 ARROW-2000: [Plasma] Deduplicate file descriptors when replying to GetRequest. Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1479 from robertnishihara/deduplicatefiledescriptors and squashes the following commits: 9be9643 [Robert Nishihara] Fix bug. 8a827cf [Robert Nishihara] Remove mmap_size from PlasmaObject. ab30d7d [Robert Nishihara] Fix tests. 2916e87 [Robert Nishihara] Remove mmap_size from PlasmaObjectSpec, and file_descriptor -> fd. 7f5c618 [Robert Nishihara] Deduplicate file descriptors when store replies to Get. ab12d63 [Robert Nishihara] Make Create return a MutableBuffer. --- cpp/src/plasma/client.cc | 45 ++-- cpp/src/plasma/client.h| 5 ++- cpp/src/plasma/format/plasma.fbs | 20 +++-- cpp/src/plasma/malloc.cc | 10 + cpp/src/plasma/malloc.h| 6 +++ cpp/src/plasma/plasma.h| 12 +- cpp/src/plasma/protocol.cc | 49 +- cpp/src/plasma/protocol.h | 11 +++-- cpp/src/plasma/store.cc| 67 ++ cpp/src/plasma/test/client_tests.cc| 14 +++ cpp/src/plasma/test/serialization_tests.cc | 26 +--- python/pyarrow/plasma.pyx | 4 +- 12 files changed, 165 insertions(+), 104 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d74c0f4..a683da0 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -54,8 +54,6 @@ namespace plasma { -using arrow::MutableBuffer; - // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; @@ -130,7 +128,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec // Increment the count of the number of objects in the memory-mapped file // that are being used. The corresponding decrement should happen in // PlasmaClient::Release. -auto entry = mmap_table_.find(object->handle.store_fd); +auto entry = mmap_table_.find(object->store_fd); ARROW_CHECK(entry != mmap_table_.end()); ARROW_CHECK(entry->second.count >= 0); // Update the in_use_object_bytes_. @@ -149,7 +147,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata, int64_t metadata_size, -std::shared_ptr* data) { +std::shared_ptr* data) { ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size)); @@ -157,7 +155,10 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, )); ObjectID id; PlasmaObject object; - RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), , )); + int store_fd; + int64_t mmap_size; + RETURN_NOT_OK( + ReadCreateReply(buffer.data(), buffer.size(), , , _fd, _size)); // If the CreateReply included an error, then the store will not send a file // descriptor. int fd = recv_fd(store_conn_); @@ -167,9 +168,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, // The metadata should come right after the data. ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); *data = std::make_shared( - lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) + - object.data_offset, - data_size); + lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. @@ -209,7 +208,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, ARROW_CHECK(object_entry->second->is_sealed) << "Plasma client called get on an u
[arrow] branch master updated: ARROW-2011: [Python] Allow setting the pickler in the serialization context.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new e446084 ARROW-2011: [Python] Allow setting the pickler in the serialization context. e446084 is described below commit e4460847f3387c6c1a8bb77edd2aedc69e7250d3 Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Fri Jan 19 14:49:17 2018 -0800 ARROW-2011: [Python] Allow setting the pickler in the serialization context. Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1493 from robertnishihara/cloudpickle and squashes the following commits: 57fb46f [Robert Nishihara] Fix test (it didn't work without cloudpickle). a884bb4 [Robert Nishihara] Add test. 14e1536 [Robert Nishihara] Allow setting the pickler in the serialization context. --- python/pyarrow/serialization.pxi | 26 ++-- python/pyarrow/tests/test_serialization.py | 39 ++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index d95d582..e7a3990 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -50,6 +50,8 @@ cdef class SerializationContext: object types_to_pickle object custom_serializers object custom_deserializers +object pickle_serializer +object pickle_deserializer def __init__(self): # Types with special serialization handlers @@ -58,6 +60,23 @@ cdef class SerializationContext: self.types_to_pickle = set() self.custom_serializers = dict() self.custom_deserializers = dict() +self.pickle_serializer = pickle.dumps +self.pickle_deserializer = pickle.loads + +def set_pickle(self, serializer, deserializer): +""" +Set the serializer and deserializer to use for objects that are to be +pickled. + +Parameters +-- +serializer : callable +The serializer to use (e.g., pickle.dumps or cloudpickle.dumps). +deserializer : callable +The deserializer to use (e.g., pickle.dumps or cloudpickle.dumps). +""" +self.pickle_serializer = serializer +self.pickle_deserializer = deserializer def clone(self): """ @@ -72,6 +91,8 @@ cdef class SerializationContext: result.whitelisted_types = self.whitelisted_types.copy() result.custom_serializers = self.custom_serializers.copy() result.custom_deserializers = self.custom_deserializers.copy() +result.pickle_serializer = self.pickle_serializer +result.pickle_deserializer = self.pickle_deserializer return result @@ -119,7 +140,8 @@ cdef class SerializationContext: # use the closest match to type(obj) type_id = self.type_to_type_id[type_] if type_id in self.types_to_pickle: -serialized_obj = {"data": pickle.dumps(obj), "pickle": True} +serialized_obj = {"data": self.pickle_serializer(obj), + "pickle": True} elif type_id in self.custom_serializers: serialized_obj = {"data": self.custom_serializers[type_id](obj)} else: @@ -139,7 +161,7 @@ cdef class SerializationContext: if "pickle" in serialized_obj: # The object was pickled, so unpickle it. -obj = pickle.loads(serialized_obj["data"]) +obj = self.pickle_deserializer(serialized_obj["data"]) else: assert type_id not in self.types_to_pickle if type_id not in self.whitelisted_types: diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 6116556..e4681e3 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -555,3 +555,42 @@ def test_deserialize_buffer_in_different_process(): dir_path = os.path.dirname(os.path.realpath(__file__)) python_file = os.path.join(dir_path, 'deserialize_buffer.py') subprocess.check_call(['python', python_file, f.name]) + + +def test_set_pickle(): +# Use a custom type to trigger pickling. +class Foo(object): +pass + +context = pa.SerializationContext() +context.register_type(Foo, 'Foo', pickle=True) + +test_object = Foo() + +# Define a custom serializer and deserializer to use in place of pickle. + +def dumps1(obj): +return b'custom' + +def loads1(serialized_obj): +return serialized_obj + b' serialization 1' + +# Test that setting a custom pickler changes the behavior. +conte
[arrow] branch master updated: ARROW-1927: [Plasma] Add delete function
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new f82b7e4 ARROW-1927: [Plasma] Add delete function f82b7e4 is described below commit f82b7e4f57bf6d5aa283823f483fabdda59d56ad Author: Jin Hai <haijin@gmail.com> AuthorDate: Wed Jan 10 15:32:11 2018 -0800 ARROW-1927: [Plasma] Add delete function Hi, I just add the delete function for Plasma and tested. JIRA ticked: https://issues.apache.org/jira/browse/ARROW-1927 Author: Jin Hai <haijin@gmail.com> Author: Philipp Moritz <pcmor...@gmail.com> Closes #1427 from JinHai-CN/plasma-delete and squashes the following commits: c6df5be [Philipp Moritz] rebase 424c1b7 [Philipp Moritz] fix linting 1d76437 [Philipp Moritz] fix tests 0ca115a [Jin Hai] Fixed two bugs according to the comments ce27077 [Jin Hai] Update the unit test cases 8b6804e [Jin Hai] ARROW-1927: [Plasma] Try to fix unit-test fault be88990 [Jin Hai] ARROW-1927: [Plasma] Add 3 test cases for delete function baf82b9 [Jin Hai] ARROW-1927: [Plasma] Update according to the CI error 53e24eb [Jin Hai] ARROW-1927: [Plasma] Update according to the comments and CI error c9984a4 [Jin Hai] ARROW-1927: [Plasma] Add delete function --- cpp/src/plasma/client.cc| 17 +++--- cpp/src/plasma/client.h | 3 ++- cpp/src/plasma/eviction_policy.cc | 10 cpp/src/plasma/eviction_policy.h| 5 cpp/src/plasma/format/plasma.fbs| 6 - cpp/src/plasma/store.cc | 46 + cpp/src/plasma/store.h | 9 cpp/src/plasma/test/client_tests.cc | 25 8 files changed, 112 insertions(+), 9 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 0dd1c44..d74c0f4 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { } Status PlasmaClient::Delete(const ObjectID& object_id) { - // TODO(rkn): In the future, we can use this method to give hints to the - // eviction policy about when an object will no longer be needed. - return Status::NotImplemented("PlasmaClient::Delete is not implemented."); + RETURN_NOT_OK(FlushReleaseHistory()); + // If the object is in used, client can't send the remove message. + if (objects_in_use_.count(object_id) > 0) { +return Status::UnknownError("PlasmaClient::Object is in use."); + } else { +// If we don't already have a reference to the object, we can try to remove the object +RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id)); +std::vector buffer; +RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply, )); +ObjectID object_id2; +DCHECK_GT(buffer.size(), 0); +RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), _id2)); +return Status::OK(); + } } Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 78793f1..35182f8 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient { Status Seal(const ObjectID& object_id); /// Delete an object from the object store. This currently assumes that the - /// object is present and has been sealed. + /// object is present, has been sealed and not used by another client. Otherwise, + /// it is a no operation. /// /// @todo We may want to allow the deletion of objects that are not present or /// haven't been sealed. diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index a7758fd..66a3b2e 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID& object_id, cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } +void EvictionPolicy::remove_object(const ObjectID& object_id) { + /* If the object is in the LRU cache, remove it. */ + cache_.remove(object_id); + + auto entry = store_info_->objects[object_id].get(); + int64_t size = entry->info.data_size + entry->info.metadata_size; + ARROW_CHECK(memory_used_ >= size); + memory_used_ -= size; +} + } // namespace plasma diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index cebf35b..b076309 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -120,6 +120,11 @@ class EvictionPolicy { int64_t choose_objects_to_evict(int64_t num_bytes_required, std::vector* objects_t
[arrow] branch master updated: ARROW-1972: [Python] Import pyarrow in DeserializeObject.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 95d894d ARROW-1972: [Python] Import pyarrow in DeserializeObject. 95d894d is described below commit 95d894da250ee7d5dfccb09e2d349e5748cc5fb8 Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Sun Jan 7 13:48:08 2018 -0800 ARROW-1972: [Python] Import pyarrow in DeserializeObject. Author: Robert Nishihara <robertnishih...@gmail.com> Author: Philipp Moritz <pcmor...@gmail.com> Closes #1463 from robertnishihara/segfaultfix and squashes the following commits: ec8a6c5 [Robert Nishihara] Add comment. 6222340 [Philipp Moritz] fix tests, linting, add license 3e969db [Robert Nishihara] Simplify tests. 8aa3fca [Philipp Moritz] add regression test bfa0851 [Robert Nishihara] Import pyarrow in DeserializeObject. --- cpp/src/arrow/python/arrow_to_python.cc| 1 + python/pyarrow/tests/deserialize_buffer.py | 26 ++ python/pyarrow/tests/test_serialization.py | 14 ++ 3 files changed, 41 insertions(+) diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index ce539a5..c060ab8 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -284,6 +284,7 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj PyObject** out) { PyAcquireGIL lock; PyDateTime_IMPORT; + import_pyarrow(); return DeserializeList(context, *obj.batch->column(0), 0, obj.batch->num_rows(), base, obj, out); } diff --git a/python/pyarrow/tests/deserialize_buffer.py b/python/pyarrow/tests/deserialize_buffer.py new file mode 100644 index 000..982dc66 --- /dev/null +++ b/python/pyarrow/tests/deserialize_buffer.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file is called from a test in test_serialization.py. + +import sys + +import pyarrow as pa + +with open(sys.argv[1], 'rb') as f: +data = f.read() +pa.deserialize(data) diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index f245dc2..6116556 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -541,3 +541,17 @@ def test_deserialize_in_different_process(): p.start() assert q.get().pattern == regex.pattern p.join() + + +def test_deserialize_buffer_in_different_process(): +import tempfile +import subprocess + +f = tempfile.NamedTemporaryFile(delete=False) +b = pa.serialize(pa.frombuffer(b'hello')).to_buffer() +f.write(b.to_pybytes()) +f.close() + +dir_path = os.path.dirname(os.path.realpath(__file__)) +python_file = os.path.join(dir_path, 'deserialize_buffer.py') +subprocess.check_call(['python', python_file, f.name]) -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].
[arrow] branch master updated: ARROW-1947: [Plasma] Change Client Create and Get to use Buffers
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 0f38a95 ARROW-1947: [Plasma] Change Client Create and Get to use Buffers 0f38a95 is described below commit 0f38a9503bcde872b705976bd314ccb6c0d0f8d5 Author: Philipp Moritz <pcmor...@gmail.com> AuthorDate: Fri Dec 29 14:19:38 2017 -0800 ARROW-1947: [Plasma] Change Client Create and Get to use Buffers - Create now takes in a pointer to a shared pointer of Buffer and returns a MutableBuffer. - Object Buffers data and metadata are pointers to shared pointers of Buffer. Author: Philipp Moritz <pcmor...@gmail.com> Author: William Paul <wapa...@berkeley.edu> Closes #1444 from Wapaul1/plasma_buffer_api and squashes the following commits: 7fe1cee [Philipp Moritz] fix size of MutableBuffer returned by plasma::Create aeed751 [Philipp Moritz] more linting b3274e0 [Philipp Moritz] fix 463dbeb [Philipp Moritz] fix plasma python extension a055fa8 [Philipp Moritz] fix linting fc62dda [William Paul] Added metadata buffer 4d8cbb8 [William Paul] Create and Get use Buffers now --- cpp/src/plasma/client.cc| 44 +++-- cpp/src/plasma/client.h | 18 +-- cpp/src/plasma/test/client_tests.cc | 36 ++ python/pyarrow/plasma.pyx | 14 +--- 4 files changed, 67 insertions(+), 45 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 9bbafac..0dd1c44 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -40,6 +40,7 @@ #include #include +#include "arrow/buffer.h" #include "plasma/common.h" #include "plasma/fling.h" #include "plasma/io.h" @@ -53,6 +54,8 @@ namespace plasma { +using arrow::MutableBuffer; + // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; @@ -145,7 +148,8 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec } Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, -uint8_t* metadata, int64_t metadata_size, uint8_t** data) { +uint8_t* metadata, int64_t metadata_size, +std::shared_ptr* data) { ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size)); @@ -162,14 +166,16 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, ARROW_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); - *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) + - object.data_offset; + *data = std::make_shared( + lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) + + object.data_offset, + data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. if (metadata != NULL) { // Copy the metadata to the buffer. -memcpy(*data + object.data_size, metadata, metadata_size); +memcpy((*data)->mutable_data() + object.data_size, metadata, metadata_size); } // Increment the count of the number of instances of this object that this // client is using. A call to PlasmaClient::Release is required to decrement @@ -203,10 +209,12 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, ARROW_CHECK(object_entry->second->is_sealed) << "Plasma client called get on an unsealed object that it created"; PlasmaObject* object = _entry->second->object; - object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd); - object_buffers[i].data = object_buffers[i].data + object->data_offset; + uint8_t* data = lookup_mmapped_file(object->handle.store_fd); + object_buffers[i].data = + std::make_shared(data + object->data_offset, object->data_size); + object_buffers[i].metadata = std::make_shared( + data + object->data_offset + object->data_size, object->metadata_size); object_buffers[i].data_size = object->data_size; - object_buffers[i].metadata = object_buffers[i].data + object->data
[arrow] branch master updated: ARROW-1951: [Python] Add memcopy threads argument to PlasmaClient put.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 16c79cc ARROW-1951: [Python] Add memcopy threads argument to PlasmaClient put. 16c79cc is described below commit 16c79cc94e2440321bcad1ebbef53ea1266b94e8 Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Thu Dec 28 13:34:32 2017 -0800 ARROW-1951: [Python] Add memcopy threads argument to PlasmaClient put. Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1451 from robertnishihara/numthreads and squashes the following commits: 5e2c7ee [Robert Nishihara] Fix tests. 55cb8ac [Robert Nishihara] Revert old change 0903726 [Robert Nishihara] Move memcopy_threads from serialization context to put. 9281de1 [Robert Nishihara] Expose memcopy threads to serialization context. --- python/pyarrow/plasma.pyx | 7 +-- python/pyarrow/tests/test_plasma.py | 6 -- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index f2e8653..9b3e409 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -370,7 +370,7 @@ cdef class PlasmaClient: object_buffers[i].metadata_size)) return result -def put(self, object value, ObjectID object_id=None, +def put(self, object value, ObjectID object_id=None, int memcopy_threads=6, serialization_context=None): """ Store a Python value into the object store. @@ -382,6 +382,9 @@ cdef class PlasmaClient: object_id : ObjectID, default None If this is provided, the specified object ID will be used to refer to the object. +memcopy_threads : int, default 6 +The number of threads to use to write the serialized object into +the object store for large objects. serialization_context : pyarrow.SerializationContext, default None Custom serialization and deserialization context. @@ -394,7 +397,7 @@ cdef class PlasmaClient: serialized = pyarrow.serialize(value, serialization_context) buffer = self.create(target_id, serialized.total_bytes) stream = pyarrow.FixedSizeBufferWriter(buffer) -stream.set_memcopy_threads(4) +stream.set_memcopy_threads(memcopy_threads) serialized.write_to(stream) self.seal(target_id) return target_id diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index b28bd60..decdc73 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -310,12 +310,14 @@ class TestPlasmaClient(object): serialization_context = pa.SerializationContext() serialization_context.register_type(CustomType, 20*b"\x00") -object_id = self.plasma_client.put(val, None, serialization_context) +object_id = self.plasma_client.put( +val, None, serialization_context=serialization_context) with pytest.raises(pa.ArrowSerializationError): result = self.plasma_client.get(object_id) -result = self.plasma_client.get(object_id, -1, serialization_context) +result = self.plasma_client.get( +object_id, -1, serialization_context=serialization_context) assert result.val == val.val def test_store_arrow_objects(self): -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].
[arrow] branch master updated: ARROW-1758: [Python] Remove pickle=True option for object serialization
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 85e2d89 ARROW-1758: [Python] Remove pickle=True option for object serialization 85e2d89 is described below commit 85e2d8960d5aeeb04d8b59e6e7d8a8266a7d095f Author: Licht-T <lich...@outlook.jp> AuthorDate: Sun Nov 26 14:12:22 2017 -0800 ARROW-1758: [Python] Remove pickle=True option for object serialization This closes [ARROW-1758](https://issues.apache.org/jira/browse/ARROW-1758). Author: Licht-T <lich...@outlook.jp> Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #1347 from Licht-T/clean-pickle-option-for-object-serialization and squashes the following commits: 927f154 [Wes McKinney] Use cloudpickle for lambda serialization if available ba998dd [Licht-T] CLN: Remove pickle=True option for object serialization --- python/pyarrow/serialization.pxi | 14 ++ python/pyarrow/serialization.py| 13 +++-- python/pyarrow/tests/test_serialization.py | 9 ++--- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 6b72277..3ee5c7d 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -47,7 +47,6 @@ cdef class SerializationContext: cdef: object type_to_type_id object whitelisted_types -object types_to_pickle object custom_serializers object custom_deserializers @@ -55,11 +54,10 @@ cdef class SerializationContext: # Types with special serialization handlers self.type_to_type_id = dict() self.whitelisted_types = dict() -self.types_to_pickle = set() self.custom_serializers = dict() self.custom_deserializers = dict() -def register_type(self, type_, type_id, pickle=False, +def register_type(self, type_, type_id, custom_serializer=None, custom_deserializer=None): """EXPERIMENTAL: Add type to the list of types we can serialize. @@ -69,9 +67,6 @@ cdef class SerializationContext: The type that we can serialize. type_id : bytes A string of bytes used to identify the type. -pickle : bool -True if the serialization should be done with pickle. -False if it should be done efficiently with Arrow. custom_serializer : callable This argument is optional, but can be provided to serialize objects of the class in a particular way. @@ -81,8 +76,6 @@ cdef class SerializationContext: """ self.type_to_type_id[type_] = type_id self.whitelisted_types[type_id] = type_ -if pickle: -self.types_to_pickle.add(type_id) if custom_serializer is not None: self.custom_serializers[type_id] = custom_serializer self.custom_deserializers[type_id] = custom_deserializer @@ -102,9 +95,7 @@ cdef class SerializationContext: # use the closest match to type(obj) type_id = self.type_to_type_id[type_] -if type_id in self.types_to_pickle: -serialized_obj = {"data": pickle.dumps(obj), "pickle": True} -elif type_id in self.custom_serializers: +if type_id in self.custom_serializers: serialized_obj = {"data": self.custom_serializers[type_id](obj)} else: if is_named_tuple(type_): @@ -125,7 +116,6 @@ cdef class SerializationContext: # The object was pickled, so unpickle it. obj = pickle.loads(serialized_obj["data"]) else: -assert type_id not in self.types_to_pickle if type_id not in self.whitelisted_types: msg = "Type ID " + str(type_id) + " not registered in " \ "deserialization callback" diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py index 2b47513..ab25b63 100644 --- a/python/pyarrow/serialization.py +++ b/python/pyarrow/serialization.py @@ -17,12 +17,18 @@ from collections import OrderedDict, defaultdict import sys +import pickle import numpy as np from pyarrow import serialize_pandas, deserialize_pandas from pyarrow.lib import _default_serialization_context +try: +import cloudpickle +except ImportError: +cloudpickle = pickle + def register_default_serialization_handlers(serialization_context): @@ -67,9 +73,12 @@ def register_default_serialization_handlers(serialization_context): serialization_context.register_type( type(lambda: 0), "function", -pickle=True) +custom_seri
[arrow] branch master updated: ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped file.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new cacbacd ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped file. cacbacd is described below commit cacbacd439919742a0b6fbec27ee73b5af29347f Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Thu Nov 16 21:56:03 2017 -0800 ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped file. This adds the `-f` flag which tells the plasma store to use a single memory-mapped file. This is accomplished by simply calling `dlmemalign`/`dlfree` on the entire space at startup. Question: Why does the test pass? Given that `plasma_client` is constructed with a release delay of 64, shouldn't the store by unable to evict some objects? Yet they all seem to get evicted just fine. cc @pcmoritz @atumanov Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1327 from robertnishihara/allocateupfront and squashes the following commits: 6f3b953 [Robert Nishihara] Augment test. 0daeae3 [Robert Nishihara] Remove hard-coded values and update test. a446fbd [Robert Nishihara] Add a test. c2374b6 [Robert Nishihara] Add flag to tell the plasma store to use a single memory-mapped file. --- cpp/src/plasma/store.cc | 27 ++- python/pyarrow/tests/test_plasma.py | 29 +++-- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 7094aed..c6a19a5 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -676,12 +676,22 @@ class PlasmaStoreRunner { PlasmaStoreRunner() {} void Start(char* socket_name, int64_t system_memory, std::string directory, - bool hugepages_enabled) { + bool hugepages_enabled, bool use_one_memory_mapped_file) { // Create the event loop. loop_.reset(new EventLoop); store_.reset( new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); plasma_config = store_->get_plasma_store_info(); + +// If the store is configured to use a single memory-mapped file, then we +// achieve that by mallocing and freeing a single large amount of space. +// that maximum allowed size up front. +if (use_one_memory_mapped_file) { + void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory); + ARROW_CHECK(pointer != NULL); + plasma::dlfree(pointer); +} + int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); @@ -716,14 +726,15 @@ void HandleSignal(int signal) { } void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory, - bool hugepages_enabled) { + bool hugepages_enabled, bool use_one_memory_mapped_file) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); g_runner.reset(new PlasmaStoreRunner()); signal(SIGTERM, HandleSignal); - g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled); + g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled, + use_one_memory_mapped_file); } } // namespace plasma @@ -733,9 +744,11 @@ int main(int argc, char* argv[]) { // Directory where plasma memory mapped files are stored. std::string plasma_directory; bool hugepages_enabled = false; + // True if a single large memory-mapped file should be created at startup. + bool use_one_memory_mapped_file = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) { switch (c) { case 'd': plasma_directory = std::string(optarg); @@ -755,6 +768,9 @@ int main(int argc, char* argv[]) { << "GB of memory."; break; } + case 'f': +use_one_memory_mapped_file = true; +break; default: exit(-1); } @@ -808,5 +824,6 @@ int main(int argc, char* argv[]) { // available. plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled); + plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled, + use_one_memory_mapped_file); } diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test
[arrow] branch master updated: ARROW-1829: [Plasma] Fixes to eviction policy.
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new ac26eb7 ARROW-1829: [Plasma] Fixes to eviction policy. ac26eb7 is described below commit ac26eb72c02d4ea7e8788ee3a8a3eeec6403b9bb Author: Robert Nishihara <robertnishih...@gmail.com> AuthorDate: Thu Nov 16 18:15:06 2017 -0800 ARROW-1829: [Plasma] Fixes to eviction policy. Some fixes to the bookkeeping for the eviction policy. Author: Robert Nishihara <robertnishih...@gmail.com> Closes #1317 from robertnishihara/evictionpolicy and squashes the following commits: db34d8e [Robert Nishihara] Add sanity checks. 0d3a6c8 [Robert Nishihara] Fixes to eviction policy. --- cpp/src/plasma/eviction_policy.cc | 34 ++ cpp/src/plasma/eviction_policy.h | 3 +-- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index 6c2309f..a7758fd 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -61,38 +61,32 @@ int64_t EvictionPolicy::choose_objects_to_evict(int64_t num_bytes_required, } /* Update the number of bytes used. */ memory_used_ -= bytes_evicted; + ARROW_CHECK(memory_used_ >= 0); return bytes_evicted; } void EvictionPolicy::object_created(const ObjectID& object_id) { auto entry = store_info_->objects[object_id].get(); cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); + int64_t size = entry->info.data_size + entry->info.metadata_size; + memory_used_ += size; + ARROW_CHECK(memory_used_ <= store_info_->memory_capacity); } bool EvictionPolicy::require_space(int64_t size, std::vector* objects_to_evict) { /* Check if there is enough space to create the object. */ int64_t required_space = memory_used_ + size - store_info_->memory_capacity; - int64_t num_bytes_evicted; - if (required_space > 0) { -/* Try to free up at least as much space as we need right now but ideally - * up to 20% of the total capacity. */ -int64_t space_to_free = std::max(size, store_info_->memory_capacity / 5); -ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects"; -/* Choose some objects to evict, and update the return pointers. */ -num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict); -ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " -<< objects_to_evict->size() << " objects to free up " -<< num_bytes_evicted << " bytes."; - } else { -num_bytes_evicted = 0; - } - if (num_bytes_evicted >= required_space) { -/* We only increment the space used if there is enough space to create the - * object. */ -memory_used_ += size; - } - return num_bytes_evicted >= required_space; + /* Try to free up at least as much space as we need right now but ideally + * up to 20% of the total capacity. */ + int64_t space_to_free = std::max(required_space, store_info_->memory_capacity / 5); + ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects"; + /* Choose some objects to evict, and update the return pointers. */ + int64_t num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict); + ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " + << objects_to_evict->size() << " objects to free up " + << num_bytes_evicted << " bytes."; + return num_bytes_evicted >= required_space && num_bytes_evicted > 0; } void EvictionPolicy::begin_object_access(const ObjectID& object_id, diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index de33dab..cebf35b 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -73,8 +73,7 @@ class EvictionPolicy { void object_created(const ObjectID& object_id); /// This method will be called when the Plasma store needs more space, perhaps - /// to create a new object. If the required amount of space cannot be freed up, - /// then a fatal error will be thrown. When this method is called, the eviction + /// to create a new object. When this method is called, the eviction /// policy will assume that the objects chosen to be evicted will in fact be /// evicted from the Plasma store by the caller. /// -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].
[arrow] branch master updated: ARROW-1775: Ability to abort created but unsealed Plasma objects
This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 78872a1 ARROW-1775: Ability to abort created but unsealed Plasma objects 78872a1 is described below commit 78872a1be263e61d7901eb36663a184c2b04effb Author: Stephanie <sw...@cs.berkeley.edu> AuthorDate: Wed Nov 8 14:13:28 2017 -0800 ARROW-1775: Ability to abort created but unsealed Plasma objects Author: Stephanie <sw...@cs.berkeley.edu> Author: Philipp Moritz <pcmor...@gmail.com> Closes #1289 from stephanie-wang/abort-objects and squashes the following commits: 38c42b9 [Stephanie] TODO for PascalCase 08d4040 [Stephanie] Move documentation dd5b29e [Stephanie] Fix memory error e6934ac [Philipp Moritz] fix linting 2b8e385 [Stephanie] Return status code when unmapping object fe20b3b [Stephanie] Add test case for PlasmaClient::Abort 646190c [Stephanie] Abort objects that were not sealed when client disconnects 5fc44c5 [Stephanie] Implement PlasmaClient::Abort --- cpp/src/plasma/client.cc| 97 + cpp/src/plasma/client.h | 19 cpp/src/plasma/format/plasma.fbs| 12 + cpp/src/plasma/protocol.cc | 28 +++ cpp/src/plasma/protocol.h | 8 +++ cpp/src/plasma/store.cc | 23 - cpp/src/plasma/store.h | 3 ++ cpp/src/plasma/test/client_tests.cc | 44 + 8 files changed, 213 insertions(+), 21 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index e57a2a6..dd32bdc 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -278,6 +278,39 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, return Status::OK(); } +Status PlasmaClient::UnmapObject(const ObjectID& object_id) { + auto object_entry = objects_in_use_.find(object_id); + ARROW_CHECK(object_entry != objects_in_use_.end()); + ARROW_CHECK(object_entry->second->count == 0); + + // Decrement the count of the number of objects in this memory-mapped file + // that the client is using. The corresponding increment should have + // happened in plasma_get. + int fd = object_entry->second->object.handle.store_fd; + auto entry = mmap_table_.find(fd); + ARROW_CHECK(entry != mmap_table_.end()); + ARROW_CHECK(entry->second.count >= 1); + if (entry->second.count == 1) { +// If no other objects are being used, then unmap the file. +int err = munmap(entry->second.pointer, entry->second.length); +if (err == -1) { + return Status::IOError("Error during munmap"); +} +// Remove the corresponding entry from the hash table. +mmap_table_.erase(fd); + } else { +// If there are other objects being used, decrement the reference count. +entry->second.count -= 1; + } + // Update the in_use_object_bytes_. + in_use_object_bytes_ -= (object_entry->second->object.data_size + + object_entry->second->object.metadata_size); + DCHECK_GE(in_use_object_bytes_, 0); + // Remove the entry from the hash table of objects currently in use. + objects_in_use_.erase(object_id); + return Status::OK(); +} + /// This is a helper method for implementing plasma_release. We maintain a /// buffer /// of release calls and only perform them once the buffer becomes full (as @@ -297,28 +330,9 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) { ARROW_CHECK(object_entry->second->count >= 0); // Check if the client is no longer using this object. if (object_entry->second->count == 0) { -// Decrement the count of the number of objects in this memory-mapped file -// that the client is using. The corresponding increment should have -// happened in plasma_get. -int fd = object_entry->second->object.handle.store_fd; -auto entry = mmap_table_.find(fd); -ARROW_CHECK(entry != mmap_table_.end()); -entry->second.count -= 1; -ARROW_CHECK(entry->second.count >= 0); -// If none are being used then unmap the file. -if (entry->second.count == 0) { - munmap(entry->second.pointer, entry->second.length); - // Remove the corresponding entry from the hash table. - mmap_table_.erase(fd); -} // Tell the store that the client no longer needs the object. +RETURN_NOT_OK(UnmapObject(object_id)); RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id)); -// Update the in_use_object_bytes_. -in_use_object_bytes_ -= (object_entry->second->object.data_size + - object_entry->second->object.metadata_size); -DCHECK_GE(in_use_object_bytes_, 0); -// Remove the entry from the
arrow git commit: ARROW-1695: [Serialization] Fix reference counting of numpy arrays created in custom serializer
Repository: arrow Updated Branches: refs/heads/master a8f518588 -> 971e99dde ARROW-1695: [Serialization] Fix reference counting of numpy arrays created in custom serializer This uses the NumPyBuffer built into Arrow's Tensor facility to protect the numpy arrays holding the Tensors to be serialized. See also the problem description in https://issues.apache.org/jira/browse/ARROW-1695. Author: Philipp Moritz <pcmor...@gmail.com> Closes #1220 from pcmoritz/fix-serialize-tensors and squashes the following commits: 7e23bb5 [Philipp Moritz] fix linting dce92ad [Philipp Moritz] fix handling of numpy arrays generated in the custom serializer methods Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/971e99dd Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/971e99dd Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/971e99dd Branch: refs/heads/master Commit: 971e99dde3ebabcd6791f4e936c0273938c45893 Parents: a8f5185 Author: Philipp Moritz <pcmor...@gmail.com> Authored: Fri Oct 20 14:27:38 2017 -0700 Committer: Philipp Moritz <pcmor...@gmail.com> Committed: Fri Oct 20 14:27:38 2017 -0700 -- cpp/src/arrow/python/python_to_arrow.cc| 27 +++-- python/pyarrow/tests/test_serialization.py | 21 +++ 2 files changed, 33 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/971e99dd/cpp/src/arrow/python/python_to_arrow.cc -- diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 47d48d7..a46d10d 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -390,15 +390,15 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value, Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr* out, - std::vector<PyObject*>* tensors_out); + std::vector<std::shared_ptr>* tensors_out); Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, std::vector<PyObject*>* subdicts, - std::vector<PyObject*>* tensors_out); + std::vector<std::shared_ptr>* tensors_out); Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared_ptr* out, - std::vector<PyObject*>* tensors_out); + std::vector<std::shared_ptr>* tensors_out); Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { if (PyArray_IsScalar(obj, Bool)) { @@ -444,7 +444,7 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples, std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets, - std::vector<PyObject*>* tensors_out) { + std::vector<std::shared_ptr>* tensors_out) { // The bool case must precede the int case (PyInt_Check passes for bools) if (PyBool_Check(elem)) { RETURN_NOT_OK(builder->AppendBool(elem == Py_True)); @@ -525,7 +525,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, std::vector<PyObject*>* subdicts, - std::vector<PyObject*>* tensors_out) { + std::vector<std::shared_ptr>* tensors_out) { int dtype = PyArray_TYPE(array); switch (dtype) { case NPY_UINT8: @@ -540,7 +540,10 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* case NPY_FLOAT: case NPY_DOUBLE: { RETURN_NOT_OK(builder->AppendTensor(static_cast(tensors_out->size(; - tensors_out->push_back(reinterpret_cast<PyObject*>(array)); + std::shared_ptr tensor; + RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), +reinterpret_cast<PyObject*>(array), )); + tensors_out->push_back(tensor); } break; default: { PyObject* serialized_object; @@ -556,7 +559,7 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared
arrow git commit: ARROW-1503: [Python] Add default serialization context, callbacks for pandas.Series/DataFrame
Repository: arrow Updated Branches: refs/heads/master 166f0a871 -> ee78cdcb1 ARROW-1503: [Python] Add default serialization context, callbacks for pandas.Series/DataFrame The performance is a bit slower than it could be because we do not have native handling of pyarrow.Buffer (per ARROW-1522). That would allow us to skip the `to_pybytes` copy portion Author: Wes McKinneyCloses #1192 from wesm/ARROW-1503 and squashes the following commits: a36f97d [Wes McKinney] Add default serialization context and add serialization callbacks for pandas Series, DataFrame 5ff10f4 [Wes McKinney] stubs for handling Series, DataFrame more efficiently by default in serialization code paths Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ee78cdcb Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ee78cdcb Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ee78cdcb Branch: refs/heads/master Commit: ee78cdcb1c475a05df9cd9de63358e80ba280a63 Parents: 166f0a8 Author: Wes McKinney Authored: Tue Oct 10 12:21:55 2017 -0700 Committer: Philipp Moritz Committed: Tue Oct 10 12:21:55 2017 -0700 -- python/pyarrow/ipc.py| 47 +++ python/pyarrow/serialization.pxi | 17 +++-- python/pyarrow/tests/test_ipc.py | 17 - 3 files changed, 78 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/ee78cdcb/python/pyarrow/ipc.py -- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index f264f08..1223673 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -187,3 +187,50 @@ def deserialize_pandas(buf, nthreads=None): reader = pa.RecordBatchStreamReader(buffer_reader) table = reader.read_all() return table.to_pandas(nthreads=nthreads) + + +# -- +# Set up default serialization context + +def _serialize_pandas_series(s): +import pandas as pd +# TODO: serializing Series without extra copy +serialized = serialize_pandas(pd.DataFrame({s.name: s})) +return { +'type': 'Series', +'data': serialized.to_pybytes() +} + + +def _serialize_pandas_dataframe(df): +return { +'type': 'DataFrame', +'data': serialize_pandas(df).to_pybytes() +} + + +def _deserialize_callback_pandas(data): +deserialized = deserialize_pandas(data['data']) +type_ = data['type'] +if type_ == 'Series': +return deserialized[deserialized.columns[0]] +elif type_ == 'DataFrame': +return deserialized +else: +raise ValueError(type_) + + +try: +import pandas as pd +lib._default_serialization_context.register_type( +pd.Series, 'pandas.Series', +custom_serializer=_serialize_pandas_series, +custom_deserializer=_deserialize_callback_pandas) + +lib._default_serialization_context.register_type( +pd.DataFrame, 'pandas.DataFrame', +custom_serializer=_serialize_pandas_dataframe, +custom_deserializer=_deserialize_callback_pandas) +except ImportError: +# no pandas +pass http://git-wip-us.apache.org/repos/asf/arrow/blob/ee78cdcb/python/pyarrow/serialization.pxi -- diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index aa1a6a4..4e9ab8e 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -137,6 +137,10 @@ cdef class SerializationContext: obj.__dict__.update(serialized_obj) return obj + +_default_serialization_context = SerializationContext() + + cdef class SerializedPyObject: """ Arrow-serialized representation of Python object @@ -174,6 +178,9 @@ cdef class SerializedPyObject: """ cdef PyObject* result +if context is None: +context = _default_serialization_context + with nogil: check_status(DeserializeObject(context, self.data, self.base, )) @@ -202,7 +209,8 @@ def serialize(object value, SerializationContext context=None): value: object Python object for the sequence that is to be serialized. context : SerializationContext -Custom serialization and deserialization context +Custom serialization and deserialization context, uses a default +context with some standard type handlers if not specified Returns --- @@ -210,6 +218,10 @@ def serialize(object value, SerializationContext context=None): """ cdef SerializedPyObject
arrow git commit: ARROW-1653: [Plasma] Use static cast to avoid compiler warning.
Repository: arrow Updated Branches: refs/heads/master eaa953852 -> 3ae43551f ARROW-1653: [Plasma] Use static cast to avoid compiler warning. See https://github.com/apache/arrow/pull/1172#discussion_r142931449 Author: Robert NishiharaCloses #1178 from robertnishihara/fixwarning and squashes the following commits: 638a2f8 [Robert Nishihara] Fix bug. 16012a9 [Robert Nishihara] Use static cast to avoid compiler warning. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3ae43551 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3ae43551 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3ae43551 Branch: refs/heads/master Commit: 3ae43551fbf22f1bd8a6e95436e96e1134e8 Parents: eaa9538 Author: Robert Nishihara Authored: Thu Oct 5 16:36:17 2017 -0700 Committer: Philipp Moritz Committed: Thu Oct 5 16:36:17 2017 -0700 -- cpp/src/plasma/io.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/3ae43551/cpp/src/plasma/io.cc -- diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index 3134ff1..2228ad9 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -99,7 +99,7 @@ Status ReadMessage(int fd, int64_t* type, std::vector* buffer) { ReadBytes(fd, reinterpret_cast (_temp), sizeof(length_temp)), *type = DISCONNECT_CLIENT); // The length must be read as an int64_t, but it should be used as a size_t. - size_t length = length_temp; + size_t length = static_cast(length_temp); if (length > buffer->size()) { buffer->resize(length); }
arrow git commit: ARROW-1647: [Plasma] Make sure to read length header as int64_t instead of size_t.
Repository: arrow Updated Branches: refs/heads/master dc129d60f -> 81319d9c2 ARROW-1647: [Plasma] Make sure to read length header as int64_t instead of size_t. Author: Robert NishiharaCloses #1172 from robertnishihara/plasmaiofixes and squashes the following commits: 9bb0c00 [Robert Nishihara] Make sure to read length header as int64_t instead of size_t. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/81319d9c Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/81319d9c Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/81319d9c Branch: refs/heads/master Commit: 81319d9c2784a626af318911c183701d320888f4 Parents: dc129d6 Author: Robert Nishihara Authored: Wed Oct 4 23:14:11 2017 -0700 Committer: Philipp Moritz Committed: Wed Oct 4 23:14:11 2017 -0700 -- cpp/src/plasma/io.cc | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/81319d9c/cpp/src/plasma/io.cc -- diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index afe7053..d604078 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -92,11 +92,14 @@ Status ReadMessage(int fd, int64_t* type, std::vector* buffer) { RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast (), sizeof(version)), *type = DISCONNECT_CLIENT); ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version; - size_t length; RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast (type), sizeof(*type)), *type = DISCONNECT_CLIENT); - RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast (), sizeof(length)), + int64_t length_temp; + RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast (_temp), + sizeof(length_temp)), *type = DISCONNECT_CLIENT); + // The length must be read as an int64_t, but it should be used as a size_t. + size_t length = length_temp; if (length > buffer->size()) { buffer->resize(length); }
arrow git commit: ARROW-1598: [C++] Fix diverged code comment in plasma tutorial
Repository: arrow Updated Branches: refs/heads/master c0a5019bf -> b41a4ee23 ARROW-1598: [C++] Fix diverged code comment in plasma tutorial Even though fixed object id is used in implementation, comment says random object id is created. Author: Kentaro HayashiCloses #1124 from kenhys/arrow-1598 and squashes the following commits: dc5934e [Kentaro Hayashi] ARROW-1598: [C++] Fix diverged code comment in plasma tutorial Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b41a4ee2 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b41a4ee2 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b41a4ee2 Branch: refs/heads/master Commit: b41a4ee2322d0084ff78b78ccfebc4536f7e0a62 Parents: c0a5019 Author: Kentaro Hayashi Authored: Fri Sep 22 12:09:09 2017 -0700 Committer: Philipp Moritz Committed: Fri Sep 22 12:09:09 2017 -0700 -- cpp/apidoc/tutorials/plasma.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/b41a4ee2/cpp/apidoc/tutorials/plasma.md -- diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md index aed9024..8d54a10 100644 --- a/cpp/apidoc/tutorials/plasma.md +++ b/cpp/apidoc/tutorials/plasma.md @@ -219,7 +219,7 @@ int main(int argc, char** argv) { // Start up and connect a Plasma client. PlasmaClient client; ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY)); - // Create an object with a random ObjectID. + // Create an object with a fixed ObjectID. ObjectID object_id = ObjectID::from_binary(""); int64_t data_size = 1000; uint8_t *data;
arrow git commit: ARROW-1410: Remove MAP_POPULATE flag when mmapping files in Plasma store.
Repository: arrow Updated Branches: refs/heads/master f50f2eacb -> a3607d2a4 ARROW-1410: Remove MAP_POPULATE flag when mmapping files in Plasma store. cc @pcmoritz @atumanov Author: Robert Nishihara <robertnishih...@gmail.com> Closes #992 from robertnishihara/removemappopulate and squashes the following commits: 8ed9612 [Robert Nishihara] Remove unnecessary ifdef. 7b75bd9 [Robert Nishihara] Remove MAP_POPULATE flag when mmapping files in Plasma store. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a3607d2a Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a3607d2a Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a3607d2a Branch: refs/heads/master Commit: a3607d2a4683dd364f36d29cdaf4761a6000ea7d Parents: f50f2ea Author: Robert Nishihara <robertnishih...@gmail.com> Authored: Fri Aug 25 11:45:19 2017 -0700 Committer: Philipp Moritz <pcmor...@gmail.com> Committed: Fri Aug 25 11:45:19 2017 -0700 -- cpp/src/plasma/malloc.cc | 10 +++--- 1 file changed, 3 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/a3607d2a/cpp/src/plasma/malloc.cc -- diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 6b9bc62..52d3620 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -134,14 +134,10 @@ void* fake_mmap(size_t size) { int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; -#ifdef __linux__ - // MAP_POPULATE will pre-populate the page tables for this memory region - // which avoids work when accessing the pages later. Only supported on Linux. - void* pointer = - mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); -#else + // MAP_POPULATE can be used to pre-populate the page tables for this memory region + // which avoids work when accessing the pages later. However it causes long pauses + // when mmapping the files. Only supported on Linux. void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); -#endif if (pointer == MAP_FAILED) { ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) {
arrow git commit: ARROW-1372: [Plasma] enable HUGETLB support on Linux to improve plasma put performance
Repository: arrow Updated Branches: refs/heads/master 652fd36a7 -> 10f7158df ARROW-1372: [Plasma] enable HUGETLB support on Linux to improve plasma put performance This PR makes it possible to use Plasma object store backed by a pre-mounted hugetlbfs. Author: Philipp MoritzAuthor: Alexey Tumanov Closes #974 from atumanov/putperf and squashes the following commits: 077b78f [Philipp Moritz] add more comments 5aa4b0d [Philipp Moritz] preflight script formatting changes 22188a6 [Philipp Moritz] formatting ffb9916 [Philipp Moritz] address comments 225429b [Philipp Moritz] update documentation with Alexey's fix 713a0c4 [Philipp Moritz] add missing includes 4c976bb [Philipp Moritz] make format fb8e1b4 [Philipp Moritz] add helpful error message 7260d59 [Philipp Moritz] expose number of threads to python and try out cleanups 98b603e [Alexey Tumanov] map_populate on linux; fall back to mlock/memset otherwise ce90ef4 [Alexey Tumanov] documenting new plasma store info fields c52f211 [Philipp Moritz] cleanups (TODO: See if memory locking helps) 4702703 [Philipp Moritz] preliminary documentation 3073a99 [Alexey Tumanov] reenable hashing a20ca56 [Alexey Tumanov] fix bug dd04b87 [Alexey Tumanov] [arrow][putperf] enable HUGETLBFS support on linux Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/10f7158d Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/10f7158d Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/10f7158d Branch: refs/heads/master Commit: 10f7158df46d838d32ef214b0573b6d719756516 Parents: 652fd36 Author: Philipp Moritz Authored: Sat Aug 19 18:43:06 2017 -0700 Committer: Philipp Moritz Committed: Sat Aug 19 18:43:06 2017 -0700 -- cpp/src/plasma/client.cc | 3 +- cpp/src/plasma/common.cc | 2 + cpp/src/plasma/common.h | 5 ++ cpp/src/plasma/malloc.cc | 82 +++-- cpp/src/plasma/malloc.h | 2 + cpp/src/plasma/plasma.h | 8 +++ cpp/src/plasma/store.cc | 86 ++- cpp/src/plasma/store.h | 7 ++- python/doc/source/plasma.rst | 47 + python/pyarrow/includes/libarrow.pxd | 4 ++ python/pyarrow/io.pxi| 15 ++ 11 files changed, 206 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/client.cc -- diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 8ea62c6..5e28d4f 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -91,7 +91,8 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } -close(fd); +close(fd); // Closing this fd has an effect on performance. + ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; entry.length = map_size; http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/common.cc -- diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index d7a7965..2de06d5 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -83,4 +83,6 @@ Status plasma_error_status(int plasma_error) { ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local; ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote; +const PlasmaStoreInfo* plasma_config; + } // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/common.h -- diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 2b71da6..66d5f30 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -95,6 +95,11 @@ enum ObjectRequestType { extern int ObjectStatusLocal; extern int ObjectStatusRemote; +/// Globally accessible reference to plasma store configuration. +/// TODO(pcm): This can be avoided with some refactoring of existing code +/// by making it possible to pass a context object through dlmalloc. +struct PlasmaStoreInfo; +extern const PlasmaStoreInfo* plasma_config; } // namespace plasma #endif // PLASMA_COMMON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/malloc.cc -- diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 77a8afe..6b9bc62 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -25,9 +25,13 @@ #include #include +#include