[
https://issues.apache.org/jira/browse/ARROW-2427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448088#comment-16448088
]
ASF GitHub Bot commented on ARROW-2427:
---
pitrou closed pull request #1867: ARROW-2427: [C++] Implement ReadAt properly
URL: https://github.com/apache/arrow/pull/1867
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ba012beb7..e3d6f84f3 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -125,9 +125,8 @@ class OSFile {
}
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void*
out) {
-std::lock_guard guard(lock_);
-RETURN_NOT_OK(Seek(position));
-return Read(nbytes, bytes_read, out);
+return internal::FileReadAt(fd_, reinterpret_cast(out),
position, nbytes,
+bytes_read);
}
Status Seek(int64_t pos) {
@@ -203,6 +202,19 @@ class ReadableFile::ReadableFileImpl : public OSFile {
return Status::OK();
}
+ Status ReadBufferAt(int64_t position, int64_t nbytes,
std::shared_ptr* out) {
+std::shared_ptr buffer;
+RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, ));
+
+int64_t bytes_read = 0;
+RETURN_NOT_OK(ReadAt(position, nbytes, _read,
buffer->mutable_data()));
+if (bytes_read < nbytes) {
+ RETURN_NOT_OK(buffer->Resize(bytes_read));
+}
+*out = buffer;
+return Status::OK();
+ }
+
private:
MemoryPool* pool_;
};
@@ -247,9 +259,7 @@ Status ReadableFile::ReadAt(int64_t position, int64_t
nbytes, int64_t* bytes_rea
Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr* out) {
- std::lock_guard guard(impl_->lock());
- RETURN_NOT_OK(Seek(position));
- return impl_->ReadBuffer(nbytes, out);
+ return impl_->ReadBufferAt(position, nbytes, out);
}
Status ReadableFile::Read(int64_t nbytes, std::shared_ptr* out) {
@@ -459,42 +469,38 @@ Status MemoryMappedFile::Close() {
return Status::OK();
}
-Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
- nbytes = std::max(
- 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t*
bytes_read,
+void* out) {
+ nbytes = std::max(0, std::min(nbytes, memory_map_->size() -
position));
if (nbytes > 0) {
-std::memcpy(out, memory_map_->head(), static_cast(nbytes));
+std::memcpy(out, memory_map_->data() + position,
static_cast(nbytes));
}
*bytes_read = nbytes;
- memory_map_->advance(nbytes);
return Status::OK();
}
-Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) {
- nbytes = std::max(
- 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
+std::shared_ptr* out) {
+ nbytes = std::max(0, std::min(nbytes, memory_map_->size() -
position));
if (nbytes > 0) {
-*out = SliceBuffer(memory_map_, memory_map_->position(), nbytes);
+*out = SliceBuffer(memory_map_, position, nbytes);
} else {
*out = std::make_shared(nullptr, 0);
}
- memory_map_->advance(nbytes);
return Status::OK();
}
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t*
bytes_read,
-void* out) {
- std::lock_guard guard(memory_map_->lock());
- RETURN_NOT_OK(Seek(position));
- return Read(nbytes, bytes_read, out);
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+ RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
+ memory_map_->advance(*bytes_read);
+ return Status::OK();
}
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
-std::shared_ptr* out) {
- std::lock_guard guard(memory_map_->lock());
- RETURN_NOT_OK(Seek(position));
- return Read(nbytes, out);
+Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) {
+ RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out));
+ memory_map_->advance((*out)->size());
+ return Status::OK();
}
bool MemoryMappedFile::supports_zero_copy() const { return true; }
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 09536a44e..743621c46 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -128,7 +128,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream,
public Seekable {
virtual bool supports_zero_copy() const = 0;
/// \brief Read nbytes at position, provide default